Feature: Ability to start MotoServer within Python (#4904)
This commit is contained in:
parent
0e3fef9b42
commit
dc5353f1ae
@ -9,5 +9,5 @@ include moto/cognitoidp/resources/*.json
|
||||
include moto/dynamodb2/parsing/reserved_keywords.txt
|
||||
include moto/ssm/resources/*.json
|
||||
include moto/support/resources/*.json
|
||||
recursive-include moto/templates *
|
||||
recursive-include moto/moto_server *
|
||||
recursive-include tests *
|
||||
|
@ -3,6 +3,9 @@
|
||||
.. role:: bash(code)
|
||||
:language: bash
|
||||
|
||||
.. role:: raw-html(raw)
|
||||
:format: html
|
||||
|
||||
================================
|
||||
Non-Python SDK's / Server Mode
|
||||
================================
|
||||
@ -44,6 +47,49 @@ interfaces with 0.0.0.0:
|
||||
Please be aware this might allow other network users to access your
|
||||
server.
|
||||
|
||||
Start within Python
|
||||
--------------------
|
||||
It is possible to start this server from within Python, in a separate thread. :raw-html:`<br />`
|
||||
By default, this server will start on port 5000, but this is configurable.
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
from moto.server import ThreadedMotoServer
|
||||
server = ThreadedMotoServer()
|
||||
server.start()
|
||||
# run tests
|
||||
client = boto3.client("service", endpoint_url="http://localhost:5000")
|
||||
...
|
||||
server.stop()
|
||||
|
||||
Note that the ThreadedMotoServer and the decorators act on the same state, making it possible to combine the two approaches. :raw-html:`<br />`
|
||||
See the following example:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
class TestThreadedMotoServer(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.server = ThreadedMotoServer()
|
||||
self.server.start()
|
||||
|
||||
def tearDown(self):
|
||||
self.server.stop()
|
||||
|
||||
@mock_s3
|
||||
def test_load_data_using_decorators(self):
|
||||
server_client = boto3.client("s3", endpoint_url="http://127.0.0.1:5000")
|
||||
server_client.create_bucket(Bucket="test")
|
||||
|
||||
in_mem_client = boto3.client("s3")
|
||||
buckets = in_mem_client.list_buckets()["Buckets"]
|
||||
[b["Name"] for b in buckets].should.equal(["test"])
|
||||
|
||||
This example shows it is possible to create state using the TreadedMotoServer, and access that state using the usual decorators. :raw-html:`<br />`
|
||||
0Note that the decorators will destroy any resources on start, so make sure to not accidentally destroy any resources created by the ThreadedMotoServer that should be kept.
|
||||
|
||||
.. note:: The ThreadedMotoServer is considered in beta for now, and the exact interface and behaviour may still change. :raw-html:`<br />` Please let us know if you'd like to see any changes.
|
||||
|
||||
Run using Docker
|
||||
----------------------
|
||||
You could also use the official Docker image from https://hub.docker.com/r/motoserver/moto/tags:
|
||||
|
48
moto/moto_server/threaded_moto_server.py
Normal file
48
moto/moto_server/threaded_moto_server.py
Normal file
@ -0,0 +1,48 @@
|
||||
import time
|
||||
from threading import Thread
|
||||
from werkzeug.serving import make_server
|
||||
|
||||
from .werkzeug_app import DomainDispatcherApplication, create_backend_app
|
||||
|
||||
|
||||
class ThreadedMotoServer:
|
||||
def __init__(self, ip_address="0.0.0.0", port=5000, verbose=True):
|
||||
|
||||
if verbose:
|
||||
print(
|
||||
"The ThreadedMotoServer is considered in beta for now, and the exact interface and behaviour may still change."
|
||||
)
|
||||
print("Please let us know if you’d like to see any changes.")
|
||||
print("========")
|
||||
|
||||
self._port = port
|
||||
|
||||
self._thread = None
|
||||
self._ip_address = ip_address
|
||||
self._server = None
|
||||
self._server_ready = False
|
||||
self._verbose = verbose
|
||||
|
||||
def _server_entry(self):
|
||||
app = DomainDispatcherApplication(create_backend_app)
|
||||
|
||||
self._server = make_server(self._ip_address, self._port, app, True)
|
||||
self._server_ready = True
|
||||
self._server.serve_forever()
|
||||
|
||||
def start(self):
|
||||
if self._verbose:
|
||||
print(
|
||||
f"Starting a new Thread with MotoServer running on {self._ip_address}:{self._port}..."
|
||||
)
|
||||
self._thread = Thread(target=self._server_entry, daemon=True)
|
||||
self._thread.start()
|
||||
while not self._server_ready:
|
||||
time.sleep(0.5)
|
||||
|
||||
def stop(self):
|
||||
self._server_ready = False
|
||||
if self._server:
|
||||
self._server.shutdown()
|
||||
|
||||
self._thread.join()
|
36
moto/moto_server/utilities.py
Normal file
36
moto/moto_server/utilities.py
Normal file
@ -0,0 +1,36 @@
|
||||
import json
|
||||
from flask.testing import FlaskClient
|
||||
|
||||
from urllib.parse import urlencode
|
||||
from werkzeug.routing import BaseConverter
|
||||
|
||||
|
||||
class RegexConverter(BaseConverter):
|
||||
# http://werkzeug.pocoo.org/docs/routing/#custom-converters
|
||||
|
||||
def __init__(self, url_map, *items):
|
||||
super().__init__(url_map)
|
||||
self.regex = items[0]
|
||||
|
||||
|
||||
class AWSTestHelper(FlaskClient):
|
||||
def action_data(self, action_name, **kwargs):
|
||||
"""
|
||||
Method calls resource with action_name and returns data of response.
|
||||
"""
|
||||
opts = {"Action": action_name}
|
||||
opts.update(kwargs)
|
||||
res = self.get(
|
||||
"/?{0}".format(urlencode(opts)),
|
||||
headers={
|
||||
"Host": "{0}.us-east-1.amazonaws.com".format(self.application.service)
|
||||
},
|
||||
)
|
||||
return res.data.decode("utf-8")
|
||||
|
||||
def action_json(self, action_name, **kwargs):
|
||||
"""
|
||||
Method calls resource with action_name and returns object obtained via
|
||||
deserialization of output.
|
||||
"""
|
||||
return json.loads(self.action_data(action_name, **kwargs))
|
292
moto/moto_server/werkzeug_app.py
Normal file
292
moto/moto_server/werkzeug_app.py
Normal file
@ -0,0 +1,292 @@
|
||||
import io
|
||||
import os
|
||||
import os.path
|
||||
from threading import Lock
|
||||
|
||||
from flask import Flask
|
||||
from flask_cors import CORS
|
||||
|
||||
import moto.backends as backends
|
||||
import moto.backend_index as backend_index
|
||||
from moto.core.utils import convert_to_flask_response
|
||||
|
||||
from .utilities import AWSTestHelper, RegexConverter
|
||||
|
||||
HTTP_METHODS = ["GET", "POST", "PUT", "DELETE", "HEAD", "PATCH", "OPTIONS"]
|
||||
|
||||
|
||||
DEFAULT_SERVICE_REGION = ("s3", "us-east-1")
|
||||
|
||||
# Map of unsigned calls to service-region as per AWS API docs
|
||||
# https://docs.aws.amazon.com/cognito/latest/developerguide/resource-permissions.html#amazon-cognito-signed-versus-unsigned-apis
|
||||
UNSIGNED_REQUESTS = {
|
||||
"AWSCognitoIdentityService": ("cognito-identity", "us-east-1"),
|
||||
"AWSCognitoIdentityProviderService": ("cognito-idp", "us-east-1"),
|
||||
}
|
||||
UNSIGNED_ACTIONS = {
|
||||
"AssumeRoleWithSAML": ("sts", "us-east-1"),
|
||||
"AssumeRoleWithWebIdentity": ("sts", "us-east-1"),
|
||||
}
|
||||
|
||||
# Some services have v4 signing names that differ from the backend service name/id.
|
||||
SIGNING_ALIASES = {
|
||||
"eventbridge": "events",
|
||||
"execute-api": "iot",
|
||||
"iotdata": "data.iot",
|
||||
}
|
||||
|
||||
# Some services are only recognizable by the version
|
||||
SERVICE_BY_VERSION = {"2009-04-15": "sdb"}
|
||||
|
||||
|
||||
class DomainDispatcherApplication(object):
|
||||
"""
|
||||
Dispatch requests to different applications based on the "Host:" header
|
||||
value. We'll match the host header value with the url_bases of each backend.
|
||||
"""
|
||||
|
||||
def __init__(self, create_app, service=None):
|
||||
self.create_app = create_app
|
||||
self.lock = Lock()
|
||||
self.app_instances = {}
|
||||
self.service = service
|
||||
self.backend_url_patterns = backend_index.backend_url_patterns
|
||||
|
||||
def get_backend_for_host(self, host):
|
||||
|
||||
if host == "moto_api":
|
||||
return host
|
||||
|
||||
if self.service:
|
||||
return self.service
|
||||
|
||||
if host in backends.BACKENDS:
|
||||
return host
|
||||
|
||||
for backend, pattern in self.backend_url_patterns:
|
||||
if pattern.match("http://%s" % host):
|
||||
return backend
|
||||
|
||||
if "amazonaws.com" in host:
|
||||
print(
|
||||
"Unable to find appropriate backend for {}."
|
||||
"Remember to add the URL to urls.py, and run scripts/update_backend_index.py to index it.".format(
|
||||
host
|
||||
)
|
||||
)
|
||||
|
||||
def infer_service_region_host(self, body, environ):
|
||||
auth = environ.get("HTTP_AUTHORIZATION")
|
||||
target = environ.get("HTTP_X_AMZ_TARGET")
|
||||
service = None
|
||||
if auth:
|
||||
# Signed request
|
||||
# Parse auth header to find service assuming a SigV4 request
|
||||
# https://docs.aws.amazon.com/general/latest/gr/sigv4-signed-request-examples.html
|
||||
# ['Credential=sdffdsa', '20170220', 'us-east-1', 'sns', 'aws4_request']
|
||||
try:
|
||||
credential_scope = auth.split(",")[0].split()[1]
|
||||
_, _, region, service, _ = credential_scope.split("/")
|
||||
service = SIGNING_ALIASES.get(service.lower(), service)
|
||||
service = service.lower()
|
||||
except ValueError:
|
||||
# Signature format does not match, this is exceptional and we can't
|
||||
# infer a service-region. A reduced set of services still use
|
||||
# the deprecated SigV2, ergo prefer S3 as most likely default.
|
||||
# https://docs.aws.amazon.com/general/latest/gr/signature-version-2.html
|
||||
service, region = DEFAULT_SERVICE_REGION
|
||||
else:
|
||||
# Unsigned request
|
||||
action = self.get_action_from_body(body)
|
||||
if target:
|
||||
service, _ = target.split(".", 1)
|
||||
service, region = UNSIGNED_REQUESTS.get(service, DEFAULT_SERVICE_REGION)
|
||||
elif action and action in UNSIGNED_ACTIONS:
|
||||
# See if we can match the Action to a known service
|
||||
service, region = UNSIGNED_ACTIONS.get(action)
|
||||
if not service:
|
||||
service, region = self.get_service_from_body(body, environ)
|
||||
if not service:
|
||||
service, region = self.get_service_from_path(environ)
|
||||
if not service:
|
||||
# S3 is the last resort when the target is also unknown
|
||||
service, region = DEFAULT_SERVICE_REGION
|
||||
|
||||
path = environ.get("PATH_INFO", "")
|
||||
if service in ["budgets", "cloudfront"]:
|
||||
# Global Services - they do not have/expect a region
|
||||
host = f"{service}.amazonaws.com"
|
||||
elif service == "mediastore" and not target:
|
||||
# All MediaStore API calls have a target header
|
||||
# If no target is set, assume we're trying to reach the mediastore-data service
|
||||
host = "data.{service}.{region}.amazonaws.com".format(
|
||||
service=service, region=region
|
||||
)
|
||||
elif service == "dynamodb":
|
||||
if environ["HTTP_X_AMZ_TARGET"].startswith("DynamoDBStreams"):
|
||||
host = "dynamodbstreams"
|
||||
else:
|
||||
dynamo_api_version = (
|
||||
environ["HTTP_X_AMZ_TARGET"].split("_")[1].split(".")[0]
|
||||
)
|
||||
# If Newer API version, use dynamodb2
|
||||
if dynamo_api_version > "20111205":
|
||||
host = "dynamodb2"
|
||||
elif service == "sagemaker":
|
||||
host = "api.{service}.{region}.amazonaws.com".format(
|
||||
service=service, region=region
|
||||
)
|
||||
elif service == "timestream":
|
||||
host = "ingest.{service}.{region}.amazonaws.com".format(
|
||||
service=service, region=region
|
||||
)
|
||||
elif service == "s3" and (
|
||||
path.startswith("/v20180820/") or "s3-control" in environ["HTTP_HOST"]
|
||||
):
|
||||
host = "s3control"
|
||||
else:
|
||||
host = "{service}.{region}.amazonaws.com".format(
|
||||
service=service, region=region
|
||||
)
|
||||
|
||||
return host
|
||||
|
||||
def get_application(self, environ):
|
||||
path_info = environ.get("PATH_INFO", "")
|
||||
|
||||
# The URL path might contain non-ASCII text, for instance unicode S3 bucket names
|
||||
if isinstance(path_info, bytes):
|
||||
path_info = path_info.decode("utf-8")
|
||||
|
||||
if path_info.startswith("/moto-api") or path_info == "/favicon.ico":
|
||||
host = "moto_api"
|
||||
elif path_info.startswith("/latest/meta-data/"):
|
||||
host = "instance_metadata"
|
||||
else:
|
||||
host = environ["HTTP_HOST"].split(":")[0]
|
||||
|
||||
with self.lock:
|
||||
backend = self.get_backend_for_host(host)
|
||||
if not backend:
|
||||
# No regular backend found; try parsing body/other headers
|
||||
body = self._get_body(environ)
|
||||
host = self.infer_service_region_host(body, environ)
|
||||
backend = self.get_backend_for_host(host)
|
||||
|
||||
app = self.app_instances.get(backend, None)
|
||||
if app is None:
|
||||
app = self.create_app(backend)
|
||||
self.app_instances[backend] = app
|
||||
return app
|
||||
|
||||
def _get_body(self, environ):
|
||||
body = None
|
||||
try:
|
||||
# AWS requests use querystrings as the body (Action=x&Data=y&...)
|
||||
simple_form = environ["CONTENT_TYPE"].startswith(
|
||||
"application/x-www-form-urlencoded"
|
||||
)
|
||||
request_body_size = int(environ["CONTENT_LENGTH"])
|
||||
if simple_form and request_body_size:
|
||||
body = environ["wsgi.input"].read(request_body_size).decode("utf-8")
|
||||
except (KeyError, ValueError):
|
||||
pass
|
||||
finally:
|
||||
if body:
|
||||
# We've consumed the body = need to reset it
|
||||
environ["wsgi.input"] = io.StringIO(body)
|
||||
return body
|
||||
|
||||
def get_service_from_body(self, body, environ):
|
||||
# Some services have the SDK Version in the body
|
||||
# If the version is unique, we can derive the service from it
|
||||
version = self.get_version_from_body(body)
|
||||
if version and version in SERVICE_BY_VERSION:
|
||||
# Boto3/1.20.7 Python/3.8.10 Linux/5.11.0-40-generic Botocore/1.23.7 region/eu-west-1
|
||||
region = environ.get("HTTP_USER_AGENT", "").split("/")[-1]
|
||||
return SERVICE_BY_VERSION[version], region
|
||||
return None, None
|
||||
|
||||
def get_version_from_body(self, body):
|
||||
try:
|
||||
body_dict = dict(x.split("=") for x in body.split("&"))
|
||||
return body_dict["Version"]
|
||||
except (AttributeError, KeyError, ValueError):
|
||||
return None
|
||||
|
||||
def get_action_from_body(self, body):
|
||||
try:
|
||||
# AWS requests use querystrings as the body (Action=x&Data=y&...)
|
||||
body_dict = dict(x.split("=") for x in body.split("&"))
|
||||
return body_dict["Action"]
|
||||
except (AttributeError, KeyError, ValueError):
|
||||
return None
|
||||
|
||||
def get_service_from_path(self, environ):
|
||||
# Moto sometimes needs to send a HTTP request to itself
|
||||
# In which case it will send a request to 'http://localhost/service_region/whatever'
|
||||
try:
|
||||
path_info = environ.get("PATH_INFO", "/")
|
||||
service, region = path_info[1 : path_info.index("/", 1)].split("_")
|
||||
return service, region
|
||||
except (AttributeError, KeyError, ValueError):
|
||||
return None, None
|
||||
|
||||
def __call__(self, environ, start_response):
|
||||
backend_app = self.get_application(environ)
|
||||
return backend_app(environ, start_response)
|
||||
|
||||
|
||||
def create_backend_app(service):
|
||||
from werkzeug.routing import Map
|
||||
|
||||
current_file = os.path.abspath(__file__)
|
||||
current_dir = os.path.abspath(os.path.join(current_file, os.pardir))
|
||||
template_dir = os.path.join(current_dir, "templates")
|
||||
|
||||
# Create the backend_app
|
||||
backend_app = Flask("moto", template_folder=template_dir)
|
||||
backend_app.debug = True
|
||||
backend_app.service = service
|
||||
CORS(backend_app)
|
||||
|
||||
# Reset view functions to reset the app
|
||||
backend_app.view_functions = {}
|
||||
backend_app.url_map = Map()
|
||||
backend_app.url_map.converters["regex"] = RegexConverter
|
||||
|
||||
backend_dict = backends.get_backend(service)
|
||||
if "us-east-1" in backend_dict:
|
||||
backend = backend_dict["us-east-1"]
|
||||
else:
|
||||
backend = backend_dict["global"]
|
||||
|
||||
for url_path, handler in backend.flask_paths.items():
|
||||
view_func = convert_to_flask_response(handler)
|
||||
if handler.__name__ == "dispatch":
|
||||
endpoint = "{0}.dispatch".format(handler.__self__.__name__)
|
||||
else:
|
||||
endpoint = view_func.__name__
|
||||
|
||||
original_endpoint = endpoint
|
||||
index = 2
|
||||
while endpoint in backend_app.view_functions:
|
||||
# HACK: Sometimes we map the same view to multiple url_paths. Flask
|
||||
# requires us to have different names.
|
||||
endpoint = original_endpoint + str(index)
|
||||
index += 1
|
||||
|
||||
# Some services do not provide a URL path
|
||||
# I.e., boto3 sends a request to 'https://ingest.timestream.amazonaws.com'
|
||||
# Which means we have a empty url_path to catch this request - but Flask can't handle that
|
||||
if url_path:
|
||||
backend_app.add_url_rule(
|
||||
url_path,
|
||||
endpoint=endpoint,
|
||||
methods=HTTP_METHODS,
|
||||
view_func=view_func,
|
||||
strict_slashes=False,
|
||||
)
|
||||
|
||||
backend_app.test_client_class = AWSTestHelper
|
||||
return backend_app
|
326
moto/server.py
326
moto/server.py
@ -1,328 +1,16 @@
|
||||
import argparse
|
||||
import io
|
||||
import json
|
||||
import os
|
||||
import signal
|
||||
import sys
|
||||
from threading import Lock
|
||||
|
||||
from flask import Flask
|
||||
from flask_cors import CORS
|
||||
from flask.testing import FlaskClient
|
||||
|
||||
from urllib.parse import urlencode
|
||||
from werkzeug.routing import BaseConverter
|
||||
from werkzeug.serving import run_simple
|
||||
|
||||
import moto.backends as backends
|
||||
import moto.backend_index as backend_index
|
||||
from moto.core.utils import convert_to_flask_response
|
||||
|
||||
HTTP_METHODS = ["GET", "POST", "PUT", "DELETE", "HEAD", "PATCH", "OPTIONS"]
|
||||
|
||||
|
||||
DEFAULT_SERVICE_REGION = ("s3", "us-east-1")
|
||||
|
||||
# Map of unsigned calls to service-region as per AWS API docs
|
||||
# https://docs.aws.amazon.com/cognito/latest/developerguide/resource-permissions.html#amazon-cognito-signed-versus-unsigned-apis
|
||||
UNSIGNED_REQUESTS = {
|
||||
"AWSCognitoIdentityService": ("cognito-identity", "us-east-1"),
|
||||
"AWSCognitoIdentityProviderService": ("cognito-idp", "us-east-1"),
|
||||
}
|
||||
UNSIGNED_ACTIONS = {
|
||||
"AssumeRoleWithSAML": ("sts", "us-east-1"),
|
||||
"AssumeRoleWithWebIdentity": ("sts", "us-east-1"),
|
||||
}
|
||||
|
||||
# Some services have v4 signing names that differ from the backend service name/id.
|
||||
SIGNING_ALIASES = {
|
||||
"eventbridge": "events",
|
||||
"execute-api": "iot",
|
||||
"iotdata": "data.iot",
|
||||
}
|
||||
|
||||
# Some services are only recognizable by the version
|
||||
SERVICE_BY_VERSION = {"2009-04-15": "sdb"}
|
||||
|
||||
|
||||
class DomainDispatcherApplication(object):
|
||||
"""
|
||||
Dispatch requests to different applications based on the "Host:" header
|
||||
value. We'll match the host header value with the url_bases of each backend.
|
||||
"""
|
||||
|
||||
def __init__(self, create_app, service=None):
|
||||
self.create_app = create_app
|
||||
self.lock = Lock()
|
||||
self.app_instances = {}
|
||||
self.service = service
|
||||
self.backend_url_patterns = backend_index.backend_url_patterns
|
||||
|
||||
def get_backend_for_host(self, host):
|
||||
|
||||
if host == "moto_api":
|
||||
return host
|
||||
|
||||
if self.service:
|
||||
return self.service
|
||||
|
||||
if host in backends.BACKENDS:
|
||||
return host
|
||||
|
||||
for backend, pattern in self.backend_url_patterns:
|
||||
if pattern.match("http://%s" % host):
|
||||
return backend
|
||||
|
||||
if "amazonaws.com" in host:
|
||||
print(
|
||||
"Unable to find appropriate backend for {}."
|
||||
"Remember to add the URL to urls.py, and run scripts/update_backend_index.py to index it.".format(
|
||||
host
|
||||
)
|
||||
)
|
||||
|
||||
def infer_service_region_host(self, body, environ):
|
||||
auth = environ.get("HTTP_AUTHORIZATION")
|
||||
target = environ.get("HTTP_X_AMZ_TARGET")
|
||||
service = None
|
||||
if auth:
|
||||
# Signed request
|
||||
# Parse auth header to find service assuming a SigV4 request
|
||||
# https://docs.aws.amazon.com/general/latest/gr/sigv4-signed-request-examples.html
|
||||
# ['Credential=sdffdsa', '20170220', 'us-east-1', 'sns', 'aws4_request']
|
||||
try:
|
||||
credential_scope = auth.split(",")[0].split()[1]
|
||||
_, _, region, service, _ = credential_scope.split("/")
|
||||
service = SIGNING_ALIASES.get(service.lower(), service)
|
||||
service = service.lower()
|
||||
except ValueError:
|
||||
# Signature format does not match, this is exceptional and we can't
|
||||
# infer a service-region. A reduced set of services still use
|
||||
# the deprecated SigV2, ergo prefer S3 as most likely default.
|
||||
# https://docs.aws.amazon.com/general/latest/gr/signature-version-2.html
|
||||
service, region = DEFAULT_SERVICE_REGION
|
||||
else:
|
||||
# Unsigned request
|
||||
action = self.get_action_from_body(body)
|
||||
if target:
|
||||
service, _ = target.split(".", 1)
|
||||
service, region = UNSIGNED_REQUESTS.get(service, DEFAULT_SERVICE_REGION)
|
||||
elif action and action in UNSIGNED_ACTIONS:
|
||||
# See if we can match the Action to a known service
|
||||
service, region = UNSIGNED_ACTIONS.get(action)
|
||||
if not service:
|
||||
service, region = self.get_service_from_body(body, environ)
|
||||
if not service:
|
||||
service, region = self.get_service_from_path(environ)
|
||||
if not service:
|
||||
# S3 is the last resort when the target is also unknown
|
||||
service, region = DEFAULT_SERVICE_REGION
|
||||
|
||||
path = environ.get("PATH_INFO", "")
|
||||
if service in ["budgets", "cloudfront"]:
|
||||
# Global Services - they do not have/expect a region
|
||||
host = f"{service}.amazonaws.com"
|
||||
elif service == "mediastore" and not target:
|
||||
# All MediaStore API calls have a target header
|
||||
# If no target is set, assume we're trying to reach the mediastore-data service
|
||||
host = "data.{service}.{region}.amazonaws.com".format(
|
||||
service=service, region=region
|
||||
)
|
||||
elif service == "dynamodb":
|
||||
if environ["HTTP_X_AMZ_TARGET"].startswith("DynamoDBStreams"):
|
||||
host = "dynamodbstreams"
|
||||
else:
|
||||
dynamo_api_version = (
|
||||
environ["HTTP_X_AMZ_TARGET"].split("_")[1].split(".")[0]
|
||||
)
|
||||
# If Newer API version, use dynamodb2
|
||||
if dynamo_api_version > "20111205":
|
||||
host = "dynamodb2"
|
||||
elif service == "sagemaker":
|
||||
host = "api.{service}.{region}.amazonaws.com".format(
|
||||
service=service, region=region
|
||||
)
|
||||
elif service == "timestream":
|
||||
host = "ingest.{service}.{region}.amazonaws.com".format(
|
||||
service=service, region=region
|
||||
)
|
||||
elif service == "s3" and (
|
||||
path.startswith("/v20180820/") or "s3-control" in environ["HTTP_HOST"]
|
||||
):
|
||||
host = "s3control"
|
||||
else:
|
||||
host = "{service}.{region}.amazonaws.com".format(
|
||||
service=service, region=region
|
||||
)
|
||||
|
||||
return host
|
||||
|
||||
def get_application(self, environ):
|
||||
path_info = environ.get("PATH_INFO", "")
|
||||
|
||||
# The URL path might contain non-ASCII text, for instance unicode S3 bucket names
|
||||
if isinstance(path_info, bytes):
|
||||
path_info = path_info.decode("utf-8")
|
||||
|
||||
if path_info.startswith("/moto-api") or path_info == "/favicon.ico":
|
||||
host = "moto_api"
|
||||
elif path_info.startswith("/latest/meta-data/"):
|
||||
host = "instance_metadata"
|
||||
else:
|
||||
host = environ["HTTP_HOST"].split(":")[0]
|
||||
|
||||
with self.lock:
|
||||
backend = self.get_backend_for_host(host)
|
||||
if not backend:
|
||||
# No regular backend found; try parsing body/other headers
|
||||
body = self._get_body(environ)
|
||||
host = self.infer_service_region_host(body, environ)
|
||||
backend = self.get_backend_for_host(host)
|
||||
|
||||
app = self.app_instances.get(backend, None)
|
||||
if app is None:
|
||||
app = self.create_app(backend)
|
||||
self.app_instances[backend] = app
|
||||
return app
|
||||
|
||||
def _get_body(self, environ):
|
||||
body = None
|
||||
try:
|
||||
# AWS requests use querystrings as the body (Action=x&Data=y&...)
|
||||
simple_form = environ["CONTENT_TYPE"].startswith(
|
||||
"application/x-www-form-urlencoded"
|
||||
)
|
||||
request_body_size = int(environ["CONTENT_LENGTH"])
|
||||
if simple_form and request_body_size:
|
||||
body = environ["wsgi.input"].read(request_body_size).decode("utf-8")
|
||||
except (KeyError, ValueError):
|
||||
pass
|
||||
finally:
|
||||
if body:
|
||||
# We've consumed the body = need to reset it
|
||||
environ["wsgi.input"] = io.StringIO(body)
|
||||
return body
|
||||
|
||||
def get_service_from_body(self, body, environ):
|
||||
# Some services have the SDK Version in the body
|
||||
# If the version is unique, we can derive the service from it
|
||||
version = self.get_version_from_body(body)
|
||||
if version and version in SERVICE_BY_VERSION:
|
||||
# Boto3/1.20.7 Python/3.8.10 Linux/5.11.0-40-generic Botocore/1.23.7 region/eu-west-1
|
||||
region = environ.get("HTTP_USER_AGENT", "").split("/")[-1]
|
||||
return SERVICE_BY_VERSION[version], region
|
||||
return None, None
|
||||
|
||||
def get_version_from_body(self, body):
|
||||
try:
|
||||
body_dict = dict(x.split("=") for x in body.split("&"))
|
||||
return body_dict["Version"]
|
||||
except (AttributeError, KeyError, ValueError):
|
||||
return None
|
||||
|
||||
def get_action_from_body(self, body):
|
||||
try:
|
||||
# AWS requests use querystrings as the body (Action=x&Data=y&...)
|
||||
body_dict = dict(x.split("=") for x in body.split("&"))
|
||||
return body_dict["Action"]
|
||||
except (AttributeError, KeyError, ValueError):
|
||||
return None
|
||||
|
||||
def get_service_from_path(self, environ):
|
||||
# Moto sometimes needs to send a HTTP request to itself
|
||||
# In which case it will send a request to 'http://localhost/service_region/whatever'
|
||||
try:
|
||||
path_info = environ.get("PATH_INFO", "/")
|
||||
service, region = path_info[1 : path_info.index("/", 1)].split("_")
|
||||
return service, region
|
||||
except (AttributeError, KeyError, ValueError):
|
||||
return None, None
|
||||
|
||||
def __call__(self, environ, start_response):
|
||||
backend_app = self.get_application(environ)
|
||||
return backend_app(environ, start_response)
|
||||
|
||||
|
||||
class RegexConverter(BaseConverter):
|
||||
# http://werkzeug.pocoo.org/docs/routing/#custom-converters
|
||||
|
||||
def __init__(self, url_map, *items):
|
||||
super().__init__(url_map)
|
||||
self.regex = items[0]
|
||||
|
||||
|
||||
class AWSTestHelper(FlaskClient):
|
||||
def action_data(self, action_name, **kwargs):
|
||||
"""
|
||||
Method calls resource with action_name and returns data of response.
|
||||
"""
|
||||
opts = {"Action": action_name}
|
||||
opts.update(kwargs)
|
||||
res = self.get(
|
||||
"/?{0}".format(urlencode(opts)),
|
||||
headers={
|
||||
"Host": "{0}.us-east-1.amazonaws.com".format(self.application.service)
|
||||
},
|
||||
)
|
||||
return res.data.decode("utf-8")
|
||||
|
||||
def action_json(self, action_name, **kwargs):
|
||||
"""
|
||||
Method calls resource with action_name and returns object obtained via
|
||||
deserialization of output.
|
||||
"""
|
||||
return json.loads(self.action_data(action_name, **kwargs))
|
||||
|
||||
|
||||
def create_backend_app(service):
|
||||
from werkzeug.routing import Map
|
||||
|
||||
# Create the backend_app
|
||||
backend_app = Flask(__name__)
|
||||
backend_app.debug = True
|
||||
backend_app.service = service
|
||||
CORS(backend_app)
|
||||
|
||||
# Reset view functions to reset the app
|
||||
backend_app.view_functions = {}
|
||||
backend_app.url_map = Map()
|
||||
backend_app.url_map.converters["regex"] = RegexConverter
|
||||
|
||||
backend_dict = backends.get_backend(service)
|
||||
if "us-east-1" in backend_dict:
|
||||
backend = backend_dict["us-east-1"]
|
||||
else:
|
||||
backend = backend_dict["global"]
|
||||
|
||||
for url_path, handler in backend.flask_paths.items():
|
||||
view_func = convert_to_flask_response(handler)
|
||||
if handler.__name__ == "dispatch":
|
||||
endpoint = "{0}.dispatch".format(handler.__self__.__name__)
|
||||
else:
|
||||
endpoint = view_func.__name__
|
||||
|
||||
original_endpoint = endpoint
|
||||
index = 2
|
||||
while endpoint in backend_app.view_functions:
|
||||
# HACK: Sometimes we map the same view to multiple url_paths. Flask
|
||||
# requires us to have different names.
|
||||
endpoint = original_endpoint + str(index)
|
||||
index += 1
|
||||
|
||||
# Some services do not provide a URL path
|
||||
# I.e., boto3 sends a request to 'https://ingest.timestream.amazonaws.com'
|
||||
# Which means we have a empty url_path to catch this request - but Flask can't handle that
|
||||
if url_path:
|
||||
backend_app.add_url_rule(
|
||||
url_path,
|
||||
endpoint=endpoint,
|
||||
methods=HTTP_METHODS,
|
||||
view_func=view_func,
|
||||
strict_slashes=False,
|
||||
)
|
||||
|
||||
backend_app.test_client_class = AWSTestHelper
|
||||
return backend_app
|
||||
from moto.moto_server.werkzeug_app import (
|
||||
DomainDispatcherApplication,
|
||||
create_backend_app,
|
||||
)
|
||||
from moto.moto_server.threaded_moto_server import ( # noqa # pylint: disable=unused-import
|
||||
ThreadedMotoServer,
|
||||
)
|
||||
|
||||
|
||||
def signal_handler(signum, frame):
|
||||
|
81
tests/test_utilities/test_threaded_server.py
Normal file
81
tests/test_utilities/test_threaded_server.py
Normal file
@ -0,0 +1,81 @@
|
||||
import boto3
|
||||
import sure # noqa # pylint: disable=unused-import
|
||||
import requests
|
||||
import unittest
|
||||
from moto import mock_s3, settings
|
||||
from moto.server import ThreadedMotoServer
|
||||
from unittest import SkipTest
|
||||
|
||||
|
||||
class TestThreadedMotoServer(unittest.TestCase):
|
||||
def setUp(self):
|
||||
if settings.TEST_SERVER_MODE:
|
||||
raise SkipTest("No point in testing ServerMode within ServerMode")
|
||||
self.server = ThreadedMotoServer(ip_address="127.0.0.1")
|
||||
self.server.start()
|
||||
requests.post("http://localhost:5000/moto-api/reset")
|
||||
|
||||
def tearDown(self):
|
||||
self.server.stop()
|
||||
|
||||
def test_server_is_reachable(self):
|
||||
s3_client = boto3.client("s3", endpoint_url="http://127.0.0.1:5000")
|
||||
s3_client.create_bucket(Bucket="test")
|
||||
buckets = s3_client.list_buckets()["Buckets"]
|
||||
buckets.should.have.length_of(1)
|
||||
[b["Name"] for b in buckets].should.equal(["test"])
|
||||
|
||||
def test_server_can_handle_multiple_services(self):
|
||||
s3_client = boto3.client("s3", endpoint_url="http://127.0.0.1:5000")
|
||||
dynamodb_client = boto3.client("dynamodb", endpoint_url="http://127.0.0.1:5000")
|
||||
s3_client.create_bucket(Bucket="test")
|
||||
dynamodb_client.create_table(
|
||||
TableName="table1",
|
||||
KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],
|
||||
AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}],
|
||||
BillingMode="PAY_PER_REQUEST",
|
||||
)
|
||||
|
||||
buckets = s3_client.list_buckets()["Buckets"]
|
||||
[b["Name"] for b in buckets].should.equal(["test"])
|
||||
|
||||
dynamodb_client.list_tables()["TableNames"].should.equal(["table1"])
|
||||
|
||||
@mock_s3
|
||||
def test_load_data_from_inmemory_client(self):
|
||||
server_client = boto3.client("s3", endpoint_url="http://127.0.0.1:5000")
|
||||
server_client.create_bucket(Bucket="test")
|
||||
|
||||
in_mem_client = boto3.client("s3")
|
||||
buckets = in_mem_client.list_buckets()["Buckets"]
|
||||
[b["Name"] for b in buckets].should.equal(["test"])
|
||||
|
||||
|
||||
def test_threaded_moto_server__different_port():
|
||||
if settings.TEST_SERVER_MODE:
|
||||
raise SkipTest("No point in testing ServerMode within ServerMode")
|
||||
server = ThreadedMotoServer(port=5001)
|
||||
server.start()
|
||||
requests.post("http://localhost:5001/moto-api/reset")
|
||||
try:
|
||||
s3_client = boto3.client("s3", endpoint_url="http://127.0.0.1:5001")
|
||||
s3_client.create_bucket(Bucket="test")
|
||||
buckets = s3_client.list_buckets()["Buckets"]
|
||||
[b["Name"] for b in buckets].should.equal(["test"])
|
||||
finally:
|
||||
server.stop()
|
||||
|
||||
|
||||
def test_threaded_moto_server__using_requests():
|
||||
if settings.TEST_SERVER_MODE:
|
||||
raise SkipTest("No point in testing ServerMode within ServerMode")
|
||||
server = ThreadedMotoServer(port=5001)
|
||||
server.start()
|
||||
requests.post("http://localhost:5001/moto-api/reset")
|
||||
try:
|
||||
r = requests.get("http://localhost:5001/moto-api")
|
||||
r.content.should.contain(b"<title>Moto</title>")
|
||||
r.status_code.should.equal(200)
|
||||
finally:
|
||||
server.stop()
|
||||
pass
|
Loading…
Reference in New Issue
Block a user