moto/moto/core/models.py
2019-10-31 10:36:05 -07:00

688 lines
22 KiB
Python

# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from __future__ import absolute_import
import functools
import inspect
import os
import re
import six
from io import BytesIO
from collections import defaultdict
from botocore.handlers import BUILTIN_HANDLERS
from botocore.awsrequest import AWSResponse
import mock
from moto import settings
import responses
from moto.packages.httpretty import HTTPretty
from .utils import (
convert_httpretty_response,
convert_regex_to_flask_path,
convert_flask_to_responses_response,
)
class BaseMockAWS(object):
nested_count = 0
def __init__(self, backends):
self.backends = backends
self.backends_for_urls = {}
from moto.backends import BACKENDS
default_backends = {
"instance_metadata": BACKENDS["instance_metadata"]["global"],
"moto_api": BACKENDS["moto_api"]["global"],
}
self.backends_for_urls.update(self.backends)
self.backends_for_urls.update(default_backends)
# "Mock" the AWS credentials as they can't be mocked in Botocore currently
FAKE_KEYS = {
"AWS_ACCESS_KEY_ID": "foobar_key",
"AWS_SECRET_ACCESS_KEY": "foobar_secret",
}
self.env_variables_mocks = mock.patch.dict(os.environ, FAKE_KEYS)
if self.__class__.nested_count == 0:
self.reset()
def __call__(self, func, reset=True):
if inspect.isclass(func):
return self.decorate_class(func)
return self.decorate_callable(func, reset)
def __enter__(self):
self.start()
return self
def __exit__(self, *args):
self.stop()
def start(self, reset=True):
self.env_variables_mocks.start()
self.__class__.nested_count += 1
if reset:
for backend in self.backends.values():
backend.reset()
self.enable_patching()
def stop(self):
self.env_variables_mocks.stop()
self.__class__.nested_count -= 1
if self.__class__.nested_count < 0:
raise RuntimeError("Called stop() before start().")
if self.__class__.nested_count == 0:
self.disable_patching()
def decorate_callable(self, func, reset):
def wrapper(*args, **kwargs):
self.start(reset=reset)
try:
result = func(*args, **kwargs)
finally:
self.stop()
return result
functools.update_wrapper(wrapper, func)
wrapper.__wrapped__ = func
return wrapper
def decorate_class(self, klass):
for attr in dir(klass):
if attr.startswith("_"):
continue
attr_value = getattr(klass, attr)
if not hasattr(attr_value, "__call__"):
continue
# Check if this is a classmethod. If so, skip patching
if inspect.ismethod(attr_value) and attr_value.__self__ is klass:
continue
# Check if this is a staticmethod. If so, skip patching
for cls in inspect.getmro(klass):
if attr_value.__name__ not in cls.__dict__:
continue
bound_attr_value = cls.__dict__[attr_value.__name__]
if not isinstance(bound_attr_value, staticmethod):
break
else:
# It is a staticmethod, skip patching
continue
try:
setattr(klass, attr, self(attr_value, reset=False))
except TypeError:
# Sometimes we can't set this for built-in types
continue
return klass
class HttprettyMockAWS(BaseMockAWS):
def reset(self):
HTTPretty.reset()
def enable_patching(self):
if not HTTPretty.is_enabled():
HTTPretty.enable()
for method in HTTPretty.METHODS:
for backend in self.backends_for_urls.values():
for key, value in backend.urls.items():
HTTPretty.register_uri(
method=method,
uri=re.compile(key),
body=convert_httpretty_response(value),
)
def disable_patching(self):
HTTPretty.disable()
HTTPretty.reset()
RESPONSES_METHODS = [
responses.GET,
responses.DELETE,
responses.HEAD,
responses.OPTIONS,
responses.PATCH,
responses.POST,
responses.PUT,
]
class CallbackResponse(responses.CallbackResponse):
"""
Need to subclass so we can change a couple things
"""
def get_response(self, request):
"""
Need to override this so we can pass decode_content=False
"""
headers = self.get_headers()
result = self.callback(request)
if isinstance(result, Exception):
raise result
status, r_headers, body = result
body = responses._handle_body(body)
headers.update(r_headers)
return responses.HTTPResponse(
status=status,
reason=six.moves.http_client.responses.get(status),
body=body,
headers=headers,
preload_content=False,
# Need to not decode_content to mimic requests
decode_content=False,
)
def _url_matches(self, url, other, match_querystring=False):
"""
Need to override this so we can fix querystrings breaking regex matching
"""
if not match_querystring:
other = other.split("?", 1)[0]
if responses._is_string(url):
if responses._has_unicode(url):
url = responses._clean_unicode(url)
if not isinstance(other, six.text_type):
other = other.encode("ascii").decode("utf8")
return self._url_matches_strict(url, other)
elif isinstance(url, responses.Pattern) and url.match(other):
return True
else:
return False
botocore_mock = responses.RequestsMock(
assert_all_requests_are_fired=False,
target="botocore.vendored.requests.adapters.HTTPAdapter.send",
)
responses_mock = responses._default_mock
# Add passthrough to allow any other requests to work
# Since this uses .startswith, it applies to http and https requests.
responses_mock.add_passthru("http")
BOTOCORE_HTTP_METHODS = ["GET", "DELETE", "HEAD", "OPTIONS", "PATCH", "POST", "PUT"]
class MockRawResponse(BytesIO):
def __init__(self, input):
if isinstance(input, six.text_type):
input = input.encode("utf-8")
super(MockRawResponse, self).__init__(input)
def stream(self, **kwargs):
contents = self.read()
while contents:
yield contents
contents = self.read()
class BotocoreStubber(object):
def __init__(self):
self.enabled = False
self.methods = defaultdict(list)
def reset(self):
self.methods.clear()
def register_response(self, method, pattern, response):
matchers = self.methods[method]
matchers.append((pattern, response))
def __call__(self, event_name, request, **kwargs):
if not self.enabled:
return None
response = None
response_callback = None
found_index = None
matchers = self.methods.get(request.method)
base_url = request.url.split("?", 1)[0]
for i, (pattern, callback) in enumerate(matchers):
if pattern.match(base_url):
if found_index is None:
found_index = i
response_callback = callback
else:
matchers.pop(found_index)
break
if response_callback is not None:
for header, value in request.headers.items():
if isinstance(value, six.binary_type):
request.headers[header] = value.decode("utf-8")
status, headers, body = response_callback(
request, request.url, request.headers
)
body = MockRawResponse(body)
response = AWSResponse(request.url, status, headers, body)
return response
botocore_stubber = BotocoreStubber()
BUILTIN_HANDLERS.append(("before-send", botocore_stubber))
def not_implemented_callback(request):
status = 400
headers = {}
response = "The method is not implemented"
return status, headers, response
class BotocoreEventMockAWS(BaseMockAWS):
def reset(self):
botocore_stubber.reset()
responses_mock.reset()
def enable_patching(self):
botocore_stubber.enabled = True
for method in BOTOCORE_HTTP_METHODS:
for backend in self.backends_for_urls.values():
for key, value in backend.urls.items():
pattern = re.compile(key)
botocore_stubber.register_response(method, pattern, value)
if not hasattr(responses_mock, "_patcher") or not hasattr(
responses_mock._patcher, "target"
):
responses_mock.start()
for method in RESPONSES_METHODS:
# for backend in default_backends.values():
for backend in self.backends_for_urls.values():
for key, value in backend.urls.items():
responses_mock.add(
CallbackResponse(
method=method,
url=re.compile(key),
callback=convert_flask_to_responses_response(value),
stream=True,
match_querystring=False,
)
)
responses_mock.add(
CallbackResponse(
method=method,
url=re.compile("https?://.+.amazonaws.com/.*"),
callback=not_implemented_callback,
stream=True,
match_querystring=False,
)
)
botocore_mock.add(
CallbackResponse(
method=method,
url=re.compile("https?://.+.amazonaws.com/.*"),
callback=not_implemented_callback,
stream=True,
match_querystring=False,
)
)
def disable_patching(self):
botocore_stubber.enabled = False
self.reset()
try:
responses_mock.stop()
except RuntimeError:
pass
MockAWS = BotocoreEventMockAWS
class ServerModeMockAWS(BaseMockAWS):
def reset(self):
import requests
requests.post("http://localhost:5000/moto-api/reset")
def enable_patching(self):
if self.__class__.nested_count == 1:
# Just started
self.reset()
from boto3 import client as real_boto3_client, resource as real_boto3_resource
import mock
def fake_boto3_client(*args, **kwargs):
if "endpoint_url" not in kwargs:
kwargs["endpoint_url"] = "http://localhost:5000"
return real_boto3_client(*args, **kwargs)
def fake_boto3_resource(*args, **kwargs):
if "endpoint_url" not in kwargs:
kwargs["endpoint_url"] = "http://localhost:5000"
return real_boto3_resource(*args, **kwargs)
def fake_httplib_send_output(self, message_body=None, *args, **kwargs):
def _convert_to_bytes(mixed_buffer):
bytes_buffer = []
for chunk in mixed_buffer:
if isinstance(chunk, six.text_type):
bytes_buffer.append(chunk.encode("utf-8"))
else:
bytes_buffer.append(chunk)
msg = b"\r\n".join(bytes_buffer)
return msg
self._buffer.extend((b"", b""))
msg = _convert_to_bytes(self._buffer)
del self._buffer[:]
if isinstance(message_body, bytes):
msg += message_body
message_body = None
self.send(msg)
# if self._expect_header_set:
# read, write, exc = select.select([self.sock], [], [self.sock], 1)
# if read:
# self._handle_expect_response(message_body)
# return
if message_body is not None:
self.send(message_body)
self._client_patcher = mock.patch("boto3.client", fake_boto3_client)
self._resource_patcher = mock.patch("boto3.resource", fake_boto3_resource)
if six.PY2:
self._httplib_patcher = mock.patch(
"httplib.HTTPConnection._send_output", fake_httplib_send_output
)
self._client_patcher.start()
self._resource_patcher.start()
if six.PY2:
self._httplib_patcher.start()
def disable_patching(self):
if self._client_patcher:
self._client_patcher.stop()
self._resource_patcher.stop()
if six.PY2:
self._httplib_patcher.stop()
class Model(type):
def __new__(self, clsname, bases, namespace):
cls = super(Model, self).__new__(self, clsname, bases, namespace)
cls.__models__ = {}
for name, value in namespace.items():
model = getattr(value, "__returns_model__", False)
if model is not False:
cls.__models__[model] = name
for base in bases:
cls.__models__.update(getattr(base, "__models__", {}))
return cls
@staticmethod
def prop(model_name):
""" decorator to mark a class method as returning model values """
def dec(f):
f.__returns_model__ = model_name
return f
return dec
model_data = defaultdict(dict)
class InstanceTrackerMeta(type):
def __new__(meta, name, bases, dct):
cls = super(InstanceTrackerMeta, meta).__new__(meta, name, bases, dct)
if name == "BaseModel":
return cls
service = cls.__module__.split(".")[1]
if name not in model_data[service]:
model_data[service][name] = cls
cls.instances = []
return cls
@six.add_metaclass(InstanceTrackerMeta)
class BaseModel(object):
def __new__(cls, *args, **kwargs):
instance = super(BaseModel, cls).__new__(cls)
cls.instances.append(instance)
return instance
class BaseBackend(object):
def _reset_model_refs(self):
# Remove all references to the models stored
for service, models in model_data.items():
for model_name, model in models.items():
model.instances = []
def reset(self):
self._reset_model_refs()
self.__dict__ = {}
self.__init__()
@property
def _url_module(self):
backend_module = self.__class__.__module__
backend_urls_module_name = backend_module.replace("models", "urls")
backend_urls_module = __import__(
backend_urls_module_name, fromlist=["url_bases", "url_paths"]
)
return backend_urls_module
@property
def urls(self):
"""
A dictionary of the urls to be mocked with this service and the handlers
that should be called in their place
"""
url_bases = self._url_module.url_bases
unformatted_paths = self._url_module.url_paths
urls = {}
for url_base in url_bases:
for url_path, handler in unformatted_paths.items():
url = url_path.format(url_base)
urls[url] = handler
return urls
@property
def url_paths(self):
"""
A dictionary of the paths of the urls to be mocked with this service and
the handlers that should be called in their place
"""
unformatted_paths = self._url_module.url_paths
paths = {}
for unformatted_path, handler in unformatted_paths.items():
path = unformatted_path.format("")
paths[path] = handler
return paths
@property
def url_bases(self):
"""
A list containing the url_bases extracted from urls.py
"""
return self._url_module.url_bases
@property
def flask_paths(self):
"""
The url paths that will be used for the flask server
"""
paths = {}
for url_path, handler in self.url_paths.items():
url_path = convert_regex_to_flask_path(url_path)
paths[url_path] = handler
return paths
def decorator(self, func=None):
if settings.TEST_SERVER_MODE:
mocked_backend = ServerModeMockAWS({"global": self})
else:
mocked_backend = MockAWS({"global": self})
if func:
return mocked_backend(func)
else:
return mocked_backend
def deprecated_decorator(self, func=None):
if func:
return HttprettyMockAWS({"global": self})(func)
else:
return HttprettyMockAWS({"global": self})
# def list_config_service_resources(self, resource_ids, resource_name, limit, next_token):
# """For AWS Config. This will list all of the resources of the given type and optional resource name and region"""
# raise NotImplementedError()
class ConfigQueryModel(object):
def __init__(self, backends):
"""Inits based on the resource type's backends (1 for each region if applicable)"""
self.backends = backends
def list_config_service_resources(
self,
resource_ids,
resource_name,
limit,
next_token,
backend_region=None,
resource_region=None,
):
"""For AWS Config. This will list all of the resources of the given type and optional resource name and region.
This supports both aggregated and non-aggregated listing. The following notes the difference:
- Non-Aggregated Listing -
This only lists resources within a region. The way that this is implemented in moto is based on the region
for the resource backend.
You must set the `backend_region` to the region that the API request arrived from. resource_region can be set to `None`.
- Aggregated Listing -
This lists resources from all potential regional backends. For non-global resource types, this should collect a full
list of resources from all the backends, and then be able to filter from the resource region. This is because an
aggregator can aggregate resources from multiple regions. In moto, aggregated regions will *assume full aggregation
from all resources in all regions for a given resource type*.
The `backend_region` should be set to `None` for these queries, and the `resource_region` should optionally be set to
the `Filters` region parameter to filter out resources that reside in a specific region.
For aggregated listings, pagination logic should be set such that the next page can properly span all the region backends.
As such, the proper way to implement is to first obtain a full list of results from all the region backends, and then filter
from there. It may be valuable to make this a concatenation of the region and resource name.
:param resource_region:
:param resource_ids:
:param resource_name:
:param limit:
:param next_token:
:param backend_region: The region for the backend to pull results from. Set to `None` if this is an aggregated query.
:return: This should return a list of Dicts that have the following fields:
[
{
'type': 'AWS::The AWS Config data type',
'name': 'The name of the resource',
'id': 'The ID of the resource',
'region': 'The region of the resource -- if global, then you may want to have the calling logic pass in the
aggregator region in for the resource region -- or just us-east-1 :P'
}
, ...
]
"""
raise NotImplementedError()
def get_config_resource(
self, resource_id, resource_name=None, backend_region=None, resource_region=None
):
"""For AWS Config. This will query the backend for the specific resource type configuration.
This supports both aggregated, and non-aggregated fetching -- for batched fetching -- the Config batching requests
will call this function N times to fetch the N objects needing to be fetched.
- Non-Aggregated Fetching -
This only fetches a resource config within a region. The way that this is implemented in moto is based on the region
for the resource backend.
You must set the `backend_region` to the region that the API request arrived from. `resource_region` should be set to `None`.
- Aggregated Fetching -
This fetches resources from all potential regional backends. For non-global resource types, this should collect a full
list of resources from all the backends, and then be able to filter from the resource region. This is because an
aggregator can aggregate resources from multiple regions. In moto, aggregated regions will *assume full aggregation
from all resources in all regions for a given resource type*.
...
:param resource_id:
:param resource_name:
:param backend_region:
:param resource_region:
:return:
"""
raise NotImplementedError()
class base_decorator(object):
mock_backend = MockAWS
def __init__(self, backends):
self.backends = backends
def __call__(self, func=None):
if self.mock_backend != HttprettyMockAWS and settings.TEST_SERVER_MODE:
mocked_backend = ServerModeMockAWS(self.backends)
else:
mocked_backend = self.mock_backend(self.backends)
if func:
return mocked_backend(func)
else:
return mocked_backend
class deprecated_base_decorator(base_decorator):
mock_backend = HttprettyMockAWS
class MotoAPIBackend(BaseBackend):
def reset(self):
from moto.backends import BACKENDS
for name, backends in BACKENDS.items():
if name == "moto_api":
continue
for region_name, backend in backends.items():
backend.reset()
self.__init__()
moto_api_backend = MotoAPIBackend()