moto/moto/core/models.py

358 lines
10 KiB
Python
Raw Normal View History

from __future__ import unicode_literals
2017-02-16 03:35:45 +00:00
from __future__ import absolute_import
2017-03-12 03:45:42 +00:00
from collections import defaultdict
2013-02-18 21:09:40 +00:00
import functools
import inspect
2013-02-18 21:09:40 +00:00
import re
2017-03-12 03:45:42 +00:00
import six
2013-02-18 21:09:40 +00:00
from moto import settings
2017-02-16 03:35:45 +00:00
from moto.packages.responses import responses
from moto.packages.httpretty import HTTPretty
2017-02-17 03:51:04 +00:00
from .utils import (
convert_httpretty_response,
convert_regex_to_flask_path,
convert_flask_to_responses_response,
)
2013-02-18 21:09:40 +00:00
2017-02-24 02:37:43 +00:00
2017-02-16 03:35:45 +00:00
class BaseMockAWS(object):
nested_count = 0
def __init__(self, backends):
self.backends = backends
self.backends_for_urls = {}
from moto.backends import BACKENDS
default_backends = {
"instance_metadata": BACKENDS['instance_metadata']['global'],
"moto_api": BACKENDS['moto_api']['global'],
}
self.backends_for_urls.update(self.backends)
self.backends_for_urls.update(default_backends)
if self.__class__.nested_count == 0:
2017-02-16 03:35:45 +00:00
self.reset()
2013-02-28 03:25:15 +00:00
2015-06-27 23:01:01 +00:00
def __call__(self, func, reset=True):
if inspect.isclass(func):
return self.decorate_class(func)
2015-06-27 23:01:01 +00:00
return self.decorate_callable(func, reset)
2013-02-28 03:25:15 +00:00
def __enter__(self):
self.start()
def __exit__(self, *args):
self.stop()
2015-04-03 03:40:40 +00:00
def start(self, reset=True):
self.__class__.nested_count += 1
2015-04-03 03:40:40 +00:00
if reset:
for backend in self.backends.values():
backend.reset()
2017-02-16 03:35:45 +00:00
self.enable_patching()
2013-02-28 03:25:15 +00:00
def stop(self):
self.__class__.nested_count -= 1
if self.__class__.nested_count < 0:
raise RuntimeError('Called stop() before start().')
2017-02-20 19:31:19 +00:00
if self.__class__.nested_count == 0:
self.disable_patching()
2013-02-28 03:25:15 +00:00
2015-06-27 23:01:01 +00:00
def decorate_callable(self, func, reset):
2013-02-28 03:25:15 +00:00
def wrapper(*args, **kwargs):
2015-06-27 23:01:01 +00:00
self.start(reset=reset)
try:
2013-02-28 03:25:15 +00:00
result = func(*args, **kwargs)
2015-06-27 23:01:01 +00:00
finally:
self.stop()
2013-02-28 03:25:15 +00:00
return result
functools.update_wrapper(wrapper, func)
wrapper.__wrapped__ = func
2013-02-28 03:25:15 +00:00
return wrapper
def decorate_class(self, klass):
for attr in dir(klass):
if attr.startswith("_"):
continue
attr_value = getattr(klass, attr)
if not hasattr(attr_value, "__call__"):
continue
# Check if this is a classmethod. If so, skip patching
if inspect.ismethod(attr_value) and attr_value.__self__ is klass:
continue
try:
2015-06-27 23:01:01 +00:00
setattr(klass, attr, self(attr_value, reset=False))
except TypeError:
# Sometimes we can't set this for built-in types
continue
return klass
2013-02-28 03:25:15 +00:00
2017-02-16 03:35:45 +00:00
class HttprettyMockAWS(BaseMockAWS):
2017-02-24 02:37:43 +00:00
2017-02-16 03:35:45 +00:00
def reset(self):
HTTPretty.reset()
def enable_patching(self):
if not HTTPretty.is_enabled():
HTTPretty.enable()
for method in HTTPretty.METHODS:
for backend in self.backends_for_urls.values():
for key, value in backend.urls.items():
HTTPretty.register_uri(
method=method,
uri=re.compile(key),
body=convert_httpretty_response(value),
)
2017-02-16 03:35:45 +00:00
def disable_patching(self):
2017-02-20 19:31:19 +00:00
HTTPretty.disable()
HTTPretty.reset()
2017-02-16 03:35:45 +00:00
RESPONSES_METHODS = [responses.GET, responses.DELETE, responses.HEAD,
2017-02-24 02:37:43 +00:00
responses.OPTIONS, responses.PATCH, responses.POST, responses.PUT]
2017-02-16 03:35:45 +00:00
class ResponsesMockAWS(BaseMockAWS):
2017-02-24 02:37:43 +00:00
2017-02-16 03:35:45 +00:00
def reset(self):
responses.reset()
def enable_patching(self):
responses.start()
for method in RESPONSES_METHODS:
for backend in self.backends_for_urls.values():
for key, value in backend.urls.items():
responses.add_callback(
method=method,
url=re.compile(key),
callback=convert_flask_to_responses_response(value),
)
2017-02-16 03:35:45 +00:00
for pattern in responses.mock._urls:
pattern['stream'] = True
def disable_patching(self):
2017-02-20 19:31:19 +00:00
try:
responses.stop()
except AttributeError:
pass
responses.reset()
2017-02-24 02:37:43 +00:00
2017-02-16 03:35:45 +00:00
MockAWS = ResponsesMockAWS
2017-02-20 19:31:19 +00:00
2017-02-20 23:25:10 +00:00
class ServerModeMockAWS(BaseMockAWS):
def reset(self):
import requests
requests.post("http://localhost:8086/moto-api/reset")
def enable_patching(self):
if self.__class__.nested_count == 1:
# Just started
self.reset()
from boto3 import client as real_boto3_client, resource as real_boto3_resource
import mock
def fake_boto3_client(*args, **kwargs):
if 'endpoint_url' not in kwargs:
kwargs['endpoint_url'] = "http://localhost:8086"
return real_boto3_client(*args, **kwargs)
2017-02-24 02:37:43 +00:00
2017-02-20 23:25:10 +00:00
def fake_boto3_resource(*args, **kwargs):
if 'endpoint_url' not in kwargs:
kwargs['endpoint_url'] = "http://localhost:8086"
return real_boto3_resource(*args, **kwargs)
self._client_patcher = mock.patch('boto3.client', fake_boto3_client)
2017-02-24 02:37:43 +00:00
self._resource_patcher = mock.patch(
'boto3.resource', fake_boto3_resource)
2017-02-20 23:25:10 +00:00
self._client_patcher.start()
self._resource_patcher.start()
def disable_patching(self):
if self._client_patcher:
self._client_patcher.stop()
self._resource_patcher.stop()
2017-02-24 02:37:43 +00:00
class Model(type):
2017-02-24 02:37:43 +00:00
def __new__(self, clsname, bases, namespace):
cls = super(Model, self).__new__(self, clsname, bases, namespace)
cls.__models__ = {}
2014-08-26 17:25:50 +00:00
for name, value in namespace.items():
model = getattr(value, "__returns_model__", False)
if model is not False:
cls.__models__[model] = name
for base in bases:
cls.__models__.update(getattr(base, "__models__", {}))
return cls
@staticmethod
def prop(model_name):
""" decorator to mark a class method as returning model values """
def dec(f):
f.__returns_model__ = model_name
return f
return dec
2013-02-18 21:09:40 +00:00
2017-03-12 03:45:42 +00:00
model_data = defaultdict(dict)
class InstanceTrackerMeta(type):
def __new__(meta, name, bases, dct):
cls = super(InstanceTrackerMeta, meta).__new__(meta, name, bases, dct)
if name == 'BaseModel':
return cls
service = cls.__module__.split(".")[1]
if name not in model_data[service]:
model_data[service][name] = cls
cls.instances = []
return cls
@six.add_metaclass(InstanceTrackerMeta)
class BaseModel(object):
def __new__(cls, *args, **kwargs):
2017-03-12 17:04:36 +00:00
if six.PY2:
instance = super(BaseModel, cls).__new__(cls, *args, **kwargs)
else:
instance = super(BaseModel, cls).__new__(cls)
2017-03-12 03:45:42 +00:00
cls.instances.append(instance)
return instance
class BaseBackend(object):
2017-02-24 02:37:43 +00:00
2013-02-18 21:09:40 +00:00
def reset(self):
2017-03-12 04:18:58 +00:00
for service, models in model_data.items():
for model_name, model in models.items():
model.instances = []
2013-02-26 04:48:17 +00:00
self.__dict__ = {}
2013-02-20 03:27:36 +00:00
self.__init__()
2013-02-18 21:09:40 +00:00
@property
2013-03-05 13:14:43 +00:00
def _url_module(self):
2013-02-18 21:09:40 +00:00
backend_module = self.__class__.__module__
backend_urls_module_name = backend_module.replace("models", "urls")
2017-02-24 02:37:43 +00:00
backend_urls_module = __import__(backend_urls_module_name, fromlist=[
'url_bases', 'url_paths'])
2013-03-05 13:14:43 +00:00
return backend_urls_module
@property
def urls(self):
"""
A dictionary of the urls to be mocked with this service and the handlers
that should be called in their place
"""
url_bases = self._url_module.url_bases
unformatted_paths = self._url_module.url_paths
urls = {}
for url_base in url_bases:
2014-08-26 17:25:50 +00:00
for url_path, handler in unformatted_paths.items():
2013-03-05 13:14:43 +00:00
url = url_path.format(url_base)
urls[url] = handler
2013-02-18 21:09:40 +00:00
return urls
2013-03-05 13:14:43 +00:00
@property
def url_paths(self):
"""
A dictionary of the paths of the urls to be mocked with this service and
the handlers that should be called in their place
"""
unformatted_paths = self._url_module.url_paths
paths = {}
2014-08-26 17:25:50 +00:00
for unformatted_path, handler in unformatted_paths.items():
2013-03-05 13:14:43 +00:00
path = unformatted_path.format("")
paths[path] = handler
return paths
@property
def url_bases(self):
"""
A list containing the url_bases extracted from urls.py
"""
return self._url_module.url_bases
2013-03-05 13:14:43 +00:00
@property
def flask_paths(self):
"""
The url paths that will be used for the flask server
"""
paths = {}
2014-08-26 17:25:50 +00:00
for url_path, handler in self.url_paths.items():
2013-03-05 13:14:43 +00:00
url_path = convert_regex_to_flask_path(url_path)
paths[url_path] = handler
return paths
2013-02-28 03:25:15 +00:00
def decorator(self, func=None):
if settings.TEST_SERVER_MODE:
mocked_backend = ServerModeMockAWS({'global': self})
else:
mocked_backend = MockAWS({'global': self})
2013-02-28 03:25:15 +00:00
if func:
return mocked_backend(func)
2013-02-28 03:25:15 +00:00
else:
return mocked_backend
2017-02-12 05:22:29 +00:00
2017-02-16 03:35:45 +00:00
def deprecated_decorator(self, func=None):
if func:
return HttprettyMockAWS({'global': self})(func)
else:
return HttprettyMockAWS({'global': self})
2017-02-12 05:22:29 +00:00
class base_decorator(object):
mock_backend = MockAWS
def __init__(self, backends):
self.backends = backends
def __call__(self, func=None):
if self.mock_backend != HttprettyMockAWS and settings.TEST_SERVER_MODE:
mocked_backend = ServerModeMockAWS(self.backends)
else:
mocked_backend = self.mock_backend(self.backends)
2017-02-20 23:25:10 +00:00
2017-02-12 05:22:29 +00:00
if func:
return mocked_backend(func)
2017-02-12 05:22:29 +00:00
else:
return mocked_backend
2017-02-16 03:35:45 +00:00
class deprecated_base_decorator(base_decorator):
mock_backend = HttprettyMockAWS
2017-02-20 23:25:10 +00:00
class MotoAPIBackend(BaseBackend):
2017-02-24 02:37:43 +00:00
2017-02-20 23:25:10 +00:00
def reset(self):
from moto.backends import BACKENDS
for name, backends in BACKENDS.items():
2017-02-20 23:25:10 +00:00
if name == "moto_api":
continue
for region_name, backend in backends.items():
backend.reset()
2017-02-20 23:25:10 +00:00
self.__init__()
2017-02-24 02:37:43 +00:00
2017-02-20 23:25:10 +00:00
moto_api_backend = MotoAPIBackend()