# -*- coding: utf-8 -*- from __future__ import unicode_literals from __future__ import absolute_import import functools import inspect import os import re import six import types from io import BytesIO from collections import defaultdict from botocore.handlers import BUILTIN_HANDLERS from botocore.awsrequest import AWSResponse import mock from moto import settings import responses from moto.packages.httpretty import HTTPretty from .utils import ( convert_httpretty_response, convert_regex_to_flask_path, convert_flask_to_responses_response, ) ACCOUNT_ID = os.environ.get("MOTO_ACCOUNT_ID", "123456789012") class BaseMockAWS(object): nested_count = 0 def __init__(self, backends): self.backends = backends self.backends_for_urls = {} from moto.backends import BACKENDS default_backends = { "instance_metadata": BACKENDS["instance_metadata"]["global"], "moto_api": BACKENDS["moto_api"]["global"], } self.backends_for_urls.update(self.backends) self.backends_for_urls.update(default_backends) # "Mock" the AWS credentials as they can't be mocked in Botocore currently FAKE_KEYS = { "AWS_ACCESS_KEY_ID": "foobar_key", "AWS_SECRET_ACCESS_KEY": "foobar_secret", } self.default_session_mock = mock.patch("boto3.DEFAULT_SESSION", None) self.env_variables_mocks = mock.patch.dict(os.environ, FAKE_KEYS) if self.__class__.nested_count == 0: self.reset() def __call__(self, func, reset=True): if inspect.isclass(func): return self.decorate_class(func) return self.decorate_callable(func, reset) def __enter__(self): self.start() return self def __exit__(self, *args): self.stop() def start(self, reset=True): self.default_session_mock.start() self.env_variables_mocks.start() self.__class__.nested_count += 1 if reset: for backend in self.backends.values(): backend.reset() self.enable_patching() def stop(self): self.default_session_mock.stop() self.env_variables_mocks.stop() self.__class__.nested_count -= 1 if self.__class__.nested_count < 0: raise RuntimeError("Called stop() before start().") if self.__class__.nested_count == 0: self.disable_patching() def decorate_callable(self, func, reset): def wrapper(*args, **kwargs): self.start(reset=reset) try: result = func(*args, **kwargs) finally: self.stop() return result functools.update_wrapper(wrapper, func) wrapper.__wrapped__ = func return wrapper def decorate_class(self, klass): for attr in dir(klass): if attr.startswith("_"): continue attr_value = getattr(klass, attr) if not hasattr(attr_value, "__call__"): continue # Check if this is a classmethod. If so, skip patching if inspect.ismethod(attr_value) and attr_value.__self__ is klass: continue # Check if this is a staticmethod. If so, skip patching for cls in inspect.getmro(klass): if attr_value.__name__ not in cls.__dict__: continue bound_attr_value = cls.__dict__[attr_value.__name__] if not isinstance(bound_attr_value, staticmethod): break else: # It is a staticmethod, skip patching continue try: setattr(klass, attr, self(attr_value, reset=False)) except TypeError: # Sometimes we can't set this for built-in types continue return klass class HttprettyMockAWS(BaseMockAWS): def reset(self): HTTPretty.reset() def enable_patching(self): if not HTTPretty.is_enabled(): HTTPretty.enable() for method in HTTPretty.METHODS: for backend in self.backends_for_urls.values(): for key, value in backend.urls.items(): HTTPretty.register_uri( method=method, uri=re.compile(key), body=convert_httpretty_response(value), ) def disable_patching(self): HTTPretty.disable() HTTPretty.reset() RESPONSES_METHODS = [ responses.GET, responses.DELETE, responses.HEAD, responses.OPTIONS, responses.PATCH, responses.POST, responses.PUT, ] class CallbackResponse(responses.CallbackResponse): """ Need to subclass so we can change a couple things """ def get_response(self, request): """ Need to override this so we can pass decode_content=False """ headers = self.get_headers() result = self.callback(request) if isinstance(result, Exception): raise result status, r_headers, body = result body = responses._handle_body(body) headers.update(r_headers) return responses.HTTPResponse( status=status, reason=six.moves.http_client.responses.get(status), body=body, headers=headers, preload_content=False, # Need to not decode_content to mimic requests decode_content=False, ) def _url_matches(self, url, other, match_querystring=False): """ Need to override this so we can fix querystrings breaking regex matching """ if not match_querystring: other = other.split("?", 1)[0] if responses._is_string(url): if responses._has_unicode(url): url = responses._clean_unicode(url) if not isinstance(other, six.text_type): other = other.encode("ascii").decode("utf8") return self._url_matches_strict(url, other) elif isinstance(url, responses.Pattern) and url.match(other): return True else: return False botocore_mock = responses.RequestsMock( assert_all_requests_are_fired=False, target="botocore.vendored.requests.adapters.HTTPAdapter.send", ) responses_mock = responses._default_mock # Add passthrough to allow any other requests to work # Since this uses .startswith, it applies to http and https requests. responses_mock.add_passthru("http") def _find_first_match(self, request): for i, match in enumerate(self._matches): if match.matches(request): return match return None # Modify behaviour of the matcher to only/always return the first match # Default behaviour is to return subsequent matches for subsequent requests, which leads to https://github.com/spulec/moto/issues/2567 # - First request matches on the appropriate S3 URL # - Same request, executed again, will be matched on the subsequent match, which happens to be the catch-all, not-yet-implemented, callback # Fix: Always return the first match responses_mock._find_match = types.MethodType(_find_first_match, responses_mock) BOTOCORE_HTTP_METHODS = ["GET", "DELETE", "HEAD", "OPTIONS", "PATCH", "POST", "PUT"] class MockRawResponse(BytesIO): def __init__(self, input): if isinstance(input, six.text_type): input = input.encode("utf-8") super(MockRawResponse, self).__init__(input) def stream(self, **kwargs): contents = self.read() while contents: yield contents contents = self.read() class BotocoreStubber(object): def __init__(self): self.enabled = False self.methods = defaultdict(list) def reset(self): self.methods.clear() def register_response(self, method, pattern, response): matchers = self.methods[method] matchers.append((pattern, response)) def __call__(self, event_name, request, **kwargs): if not self.enabled: return None response = None response_callback = None found_index = None matchers = self.methods.get(request.method) base_url = request.url.split("?", 1)[0] for i, (pattern, callback) in enumerate(matchers): if pattern.match(base_url): if found_index is None: found_index = i response_callback = callback else: matchers.pop(found_index) break if response_callback is not None: for header, value in request.headers.items(): if isinstance(value, six.binary_type): request.headers[header] = value.decode("utf-8") status, headers, body = response_callback( request, request.url, request.headers ) body = MockRawResponse(body) response = AWSResponse(request.url, status, headers, body) return response botocore_stubber = BotocoreStubber() BUILTIN_HANDLERS.append(("before-send", botocore_stubber)) def not_implemented_callback(request): status = 400 headers = {} response = "The method is not implemented" return status, headers, response class BotocoreEventMockAWS(BaseMockAWS): def reset(self): botocore_stubber.reset() responses_mock.reset() def enable_patching(self): botocore_stubber.enabled = True for method in BOTOCORE_HTTP_METHODS: for backend in self.backends_for_urls.values(): for key, value in backend.urls.items(): pattern = re.compile(key) botocore_stubber.register_response(method, pattern, value) if not hasattr(responses_mock, "_patcher") or not hasattr( responses_mock._patcher, "target" ): responses_mock.start() for method in RESPONSES_METHODS: # for backend in default_backends.values(): for backend in self.backends_for_urls.values(): for key, value in backend.urls.items(): responses_mock.add( CallbackResponse( method=method, url=re.compile(key), callback=convert_flask_to_responses_response(value), stream=True, match_querystring=False, ) ) responses_mock.add( CallbackResponse( method=method, url=re.compile(r"https?://.+.amazonaws.com/.*"), callback=not_implemented_callback, stream=True, match_querystring=False, ) ) botocore_mock.add( CallbackResponse( method=method, url=re.compile(r"https?://.+.amazonaws.com/.*"), callback=not_implemented_callback, stream=True, match_querystring=False, ) ) def disable_patching(self): botocore_stubber.enabled = False self.reset() try: responses_mock.stop() except RuntimeError: pass MockAWS = BotocoreEventMockAWS class ServerModeMockAWS(BaseMockAWS): def reset(self): import requests requests.post("http://localhost:5000/moto-api/reset") def enable_patching(self): if self.__class__.nested_count == 1: # Just started self.reset() from boto3 import client as real_boto3_client, resource as real_boto3_resource import mock def fake_boto3_client(*args, **kwargs): if "endpoint_url" not in kwargs: kwargs["endpoint_url"] = "http://localhost:5000" return real_boto3_client(*args, **kwargs) def fake_boto3_resource(*args, **kwargs): if "endpoint_url" not in kwargs: kwargs["endpoint_url"] = "http://localhost:5000" return real_boto3_resource(*args, **kwargs) def fake_httplib_send_output(self, message_body=None, *args, **kwargs): def _convert_to_bytes(mixed_buffer): bytes_buffer = [] for chunk in mixed_buffer: if isinstance(chunk, six.text_type): bytes_buffer.append(chunk.encode("utf-8")) else: bytes_buffer.append(chunk) msg = b"\r\n".join(bytes_buffer) return msg self._buffer.extend((b"", b"")) msg = _convert_to_bytes(self._buffer) del self._buffer[:] if isinstance(message_body, bytes): msg += message_body message_body = None self.send(msg) # if self._expect_header_set: # read, write, exc = select.select([self.sock], [], [self.sock], 1) # if read: # self._handle_expect_response(message_body) # return if message_body is not None: self.send(message_body) self._client_patcher = mock.patch("boto3.client", fake_boto3_client) self._resource_patcher = mock.patch("boto3.resource", fake_boto3_resource) if six.PY2: self._httplib_patcher = mock.patch( "httplib.HTTPConnection._send_output", fake_httplib_send_output ) self._client_patcher.start() self._resource_patcher.start() if six.PY2: self._httplib_patcher.start() def disable_patching(self): if self._client_patcher: self._client_patcher.stop() self._resource_patcher.stop() if six.PY2: self._httplib_patcher.stop() class Model(type): def __new__(self, clsname, bases, namespace): cls = super(Model, self).__new__(self, clsname, bases, namespace) cls.__models__ = {} for name, value in namespace.items(): model = getattr(value, "__returns_model__", False) if model is not False: cls.__models__[model] = name for base in bases: cls.__models__.update(getattr(base, "__models__", {})) return cls @staticmethod def prop(model_name): """ decorator to mark a class method as returning model values """ def dec(f): f.__returns_model__ = model_name return f return dec model_data = defaultdict(dict) class InstanceTrackerMeta(type): def __new__(meta, name, bases, dct): cls = super(InstanceTrackerMeta, meta).__new__(meta, name, bases, dct) if name == "BaseModel": return cls service = cls.__module__.split(".")[1] if name not in model_data[service]: model_data[service][name] = cls cls.instances = [] return cls @six.add_metaclass(InstanceTrackerMeta) class BaseModel(object): def __new__(cls, *args, **kwargs): instance = super(BaseModel, cls).__new__(cls) cls.instances.append(instance) return instance class BaseBackend(object): def _reset_model_refs(self): # Remove all references to the models stored for service, models in model_data.items(): for model_name, model in models.items(): model.instances = [] def reset(self): self._reset_model_refs() self.__dict__ = {} self.__init__() @property def _url_module(self): backend_module = self.__class__.__module__ backend_urls_module_name = backend_module.replace("models", "urls") backend_urls_module = __import__( backend_urls_module_name, fromlist=["url_bases", "url_paths"] ) return backend_urls_module @property def urls(self): """ A dictionary of the urls to be mocked with this service and the handlers that should be called in their place """ url_bases = self._url_module.url_bases unformatted_paths = self._url_module.url_paths urls = {} for url_base in url_bases: for url_path, handler in unformatted_paths.items(): url = url_path.format(url_base) urls[url] = handler return urls @property def url_paths(self): """ A dictionary of the paths of the urls to be mocked with this service and the handlers that should be called in their place """ unformatted_paths = self._url_module.url_paths paths = {} for unformatted_path, handler in unformatted_paths.items(): path = unformatted_path.format("") paths[path] = handler return paths @property def url_bases(self): """ A list containing the url_bases extracted from urls.py """ return self._url_module.url_bases @property def flask_paths(self): """ The url paths that will be used for the flask server """ paths = {} for url_path, handler in self.url_paths.items(): url_path = convert_regex_to_flask_path(url_path) paths[url_path] = handler return paths def decorator(self, func=None): if settings.TEST_SERVER_MODE: mocked_backend = ServerModeMockAWS({"global": self}) else: mocked_backend = MockAWS({"global": self}) if func: return mocked_backend(func) else: return mocked_backend def deprecated_decorator(self, func=None): if func: return HttprettyMockAWS({"global": self})(func) else: return HttprettyMockAWS({"global": self}) # def list_config_service_resources(self, resource_ids, resource_name, limit, next_token): # """For AWS Config. This will list all of the resources of the given type and optional resource name and region""" # raise NotImplementedError() class ConfigQueryModel(object): def __init__(self, backends): """Inits based on the resource type's backends (1 for each region if applicable)""" self.backends = backends def list_config_service_resources( self, resource_ids, resource_name, limit, next_token, backend_region=None, resource_region=None, ): """For AWS Config. This will list all of the resources of the given type and optional resource name and region. This supports both aggregated and non-aggregated listing. The following notes the difference: - Non-Aggregated Listing - This only lists resources within a region. The way that this is implemented in moto is based on the region for the resource backend. You must set the `backend_region` to the region that the API request arrived from. resource_region can be set to `None`. - Aggregated Listing - This lists resources from all potential regional backends. For non-global resource types, this should collect a full list of resources from all the backends, and then be able to filter from the resource region. This is because an aggregator can aggregate resources from multiple regions. In moto, aggregated regions will *assume full aggregation from all resources in all regions for a given resource type*. The `backend_region` should be set to `None` for these queries, and the `resource_region` should optionally be set to the `Filters` region parameter to filter out resources that reside in a specific region. For aggregated listings, pagination logic should be set such that the next page can properly span all the region backends. As such, the proper way to implement is to first obtain a full list of results from all the region backends, and then filter from there. It may be valuable to make this a concatenation of the region and resource name. :param resource_ids: A list of resource IDs :param resource_name: The individual name of a resource :param limit: How many per page :param next_token: The item that will page on :param backend_region: The region for the backend to pull results from. Set to `None` if this is an aggregated query. :param resource_region: The region for where the resources reside to pull results from. Set to `None` if this is a non-aggregated query. :return: This should return a list of Dicts that have the following fields: [ { 'type': 'AWS::The AWS Config data type', 'name': 'The name of the resource', 'id': 'The ID of the resource', 'region': 'The region of the resource -- if global, then you may want to have the calling logic pass in the aggregator region in for the resource region -- or just us-east-1 :P' } , ... ] """ raise NotImplementedError() def get_config_resource( self, resource_id, resource_name=None, backend_region=None, resource_region=None ): """For AWS Config. This will query the backend for the specific resource type configuration. This supports both aggregated, and non-aggregated fetching -- for batched fetching -- the Config batching requests will call this function N times to fetch the N objects needing to be fetched. - Non-Aggregated Fetching - This only fetches a resource config within a region. The way that this is implemented in moto is based on the region for the resource backend. You must set the `backend_region` to the region that the API request arrived from. `resource_region` should be set to `None`. - Aggregated Fetching - This fetches resources from all potential regional backends. For non-global resource types, this should collect a full list of resources from all the backends, and then be able to filter from the resource region. This is because an aggregator can aggregate resources from multiple regions. In moto, aggregated regions will *assume full aggregation from all resources in all regions for a given resource type*. ... :param resource_id: :param resource_name: :param backend_region: :param resource_region: :return: """ raise NotImplementedError() class base_decorator(object): mock_backend = MockAWS def __init__(self, backends): self.backends = backends def __call__(self, func=None): if self.mock_backend != HttprettyMockAWS and settings.TEST_SERVER_MODE: mocked_backend = ServerModeMockAWS(self.backends) else: mocked_backend = self.mock_backend(self.backends) if func: return mocked_backend(func) else: return mocked_backend class deprecated_base_decorator(base_decorator): mock_backend = HttprettyMockAWS class MotoAPIBackend(BaseBackend): def reset(self): from moto.backends import BACKENDS for name, backends in BACKENDS.items(): if name == "moto_api": continue for region_name, backend in backends.items(): backend.reset() self.__init__() moto_api_backend = MotoAPIBackend()