MotoProxy - configure passthrough options (#7388)
This commit is contained in:
parent
6f797c2f00
commit
375c9efa86
2
.github/workflows/tests_proxymode.yml
vendored
2
.github/workflows/tests_proxymode.yml
vendored
@ -41,7 +41,7 @@ jobs:
|
||||
env:
|
||||
TEST_PROXY_MODE: ${{ true }}
|
||||
run: |
|
||||
pytest -sv tests/test_acmpca tests/test_awslambda tests/test_apigateway tests/test_s3
|
||||
pytest -sv tests/test_acmpca tests/test_awslambda tests/test_apigateway tests/test_core/test_proxy.py tests/test_s3
|
||||
- name: "Stop MotoProxy"
|
||||
if: always()
|
||||
run: |
|
||||
|
@ -119,11 +119,31 @@ Terraform Configuration
|
||||
}
|
||||
|
||||
|
||||
Drawbacks
|
||||
------------
|
||||
URL Passthroughs
|
||||
----------------
|
||||
|
||||
Configuring a proxy means that all requests are intercepted, but the MotoProxy can only handle requests to AWS.
|
||||
If some URL's should not be intercepted, you can configure the MotoProxy to pass them through.
|
||||
|
||||
If your test includes a call to `https://www.thirdpartyservice.com`, that will also be intercepted by `MotoProxy` - and subsequently throw an error because it doesn't know how to handle non-AWS requests.
|
||||
To do so, make the following HTTP request:
|
||||
|
||||
.. code-block::
|
||||
|
||||
config_url = "http://motoapi.amazonaws.com/moto-api/proxy/passthrough"
|
||||
proxies = {"http": "http://localhost:5005", "https": "http://localhost:5005"}
|
||||
|
||||
http_url = "http://some_website.com/path"
|
||||
https_host = "google.com"
|
||||
config = {"http_urls": [http_url], "https_hosts": [https_host]}
|
||||
|
||||
requests.post(config_url, json=config, proxies=proxies)
|
||||
|
||||
Note the difference between `http_url` and `https_hosts`. You can configure a full URL to intercept **if and only if** it is a HTTP (unsecured) url.
|
||||
|
||||
If you want to passthrough a request to a HTTPS endpoint, you have to specify the HTTPS host. Say you want to make a request to `https://companywebsite.com/mydata`, the `https_host` would have to be set to `companywebsite.com`.
|
||||
|
||||
All HTTPS requests to this domain will be intercepted.
|
||||
|
||||
Alternative Passthrough
|
||||
-----------------------
|
||||
|
||||
If your test setup supports the `NO_PROXY` environment variable, you could exclude `www.thirdpartyservice.com` from being proxied by setting `NO_PROXY=www.thirdpartyservice.com`. `NO_PROXY` accepts a comma separated list of domains, e.g. `NO_PROXY=.thirdpartyservice.com,api.anotherservice.com`.
|
||||
|
@ -1,4 +1,4 @@
|
||||
from typing import Any, Dict, List, Optional
|
||||
from typing import Any, Dict, List, Optional, Set, Tuple
|
||||
|
||||
from moto.core import DEFAULT_ACCOUNT_ID
|
||||
from moto.core.base_backend import BackendDict, BaseBackend
|
||||
@ -6,6 +6,11 @@ from moto.core.model_instances import reset_model_data
|
||||
|
||||
|
||||
class MotoAPIBackend(BaseBackend):
|
||||
def __init__(self, region_name: str, account_id: str):
|
||||
super().__init__(region_name, account_id)
|
||||
self.proxy_urls_to_passthrough: Set[str] = set()
|
||||
self.proxy_hosts_to_passthrough: Set[str] = set()
|
||||
|
||||
def reset(self) -> None:
|
||||
region_name = self.region_name
|
||||
account_id = self.account_id
|
||||
@ -96,5 +101,20 @@ class MotoAPIBackend(BaseBackend):
|
||||
backend = inspector2_backends[account_id][region]
|
||||
backend.findings_queue.append(results)
|
||||
|
||||
def get_proxy_passthrough(self) -> Tuple[Set[str], Set[str]]:
|
||||
return self.proxy_urls_to_passthrough, self.proxy_hosts_to_passthrough
|
||||
|
||||
def set_proxy_passthrough(
|
||||
self, http_urls: List[str], https_hosts: List[str]
|
||||
) -> None:
|
||||
for url in http_urls:
|
||||
self.proxy_urls_to_passthrough.add(url)
|
||||
for host in https_hosts:
|
||||
self.proxy_hosts_to_passthrough.add(host)
|
||||
|
||||
def delete_proxy_passthroughs(self) -> None:
|
||||
self.proxy_urls_to_passthrough.clear()
|
||||
self.proxy_hosts_to_passthrough.clear()
|
||||
|
||||
|
||||
moto_api_backend = MotoAPIBackend(region_name="global", account_id=DEFAULT_ACCOUNT_ID)
|
||||
|
@ -1,6 +1,8 @@
|
||||
import json
|
||||
from typing import Any, Dict, List
|
||||
|
||||
from botocore.awsrequest import AWSPreparedRequest
|
||||
|
||||
from moto import settings
|
||||
from moto.core import DEFAULT_ACCOUNT_ID
|
||||
from moto.core.common_types import TYPE_RESPONSE
|
||||
@ -110,9 +112,7 @@ class MotoAPIResponse(BaseResponse):
|
||||
) -> TYPE_RESPONSE:
|
||||
from .models import moto_api_backend
|
||||
|
||||
request_body_size = int(headers["Content-Length"])
|
||||
body = request.environ["wsgi.input"].read(request_body_size).decode("utf-8")
|
||||
body = json.loads(body)
|
||||
body = self._get_body(headers, request)
|
||||
model_name = body["model_name"]
|
||||
transition = body["transition"]
|
||||
|
||||
@ -127,9 +127,7 @@ class MotoAPIResponse(BaseResponse):
|
||||
) -> TYPE_RESPONSE:
|
||||
from .models import moto_api_backend
|
||||
|
||||
request_body_size = int(headers["Content-Length"])
|
||||
body = request.environ["wsgi.input"].read(request_body_size).decode("utf-8")
|
||||
body = json.loads(body)
|
||||
body = self._get_body(headers, request)
|
||||
model_name = body["model_name"]
|
||||
|
||||
moto_api_backend.unset_transition(model_name)
|
||||
@ -151,9 +149,7 @@ class MotoAPIResponse(BaseResponse):
|
||||
) -> TYPE_RESPONSE:
|
||||
from .models import moto_api_backend
|
||||
|
||||
request_body_size = int(headers["Content-Length"])
|
||||
body = request.environ["wsgi.input"].read(request_body_size).decode("utf-8")
|
||||
body = json.loads(body)
|
||||
body = self._get_body(headers, request)
|
||||
account_id = body.get("account_id", DEFAULT_ACCOUNT_ID)
|
||||
region = body.get("region", "us-east-1")
|
||||
|
||||
@ -176,9 +172,7 @@ class MotoAPIResponse(BaseResponse):
|
||||
) -> TYPE_RESPONSE:
|
||||
from .models import moto_api_backend
|
||||
|
||||
request_body_size = int(headers["Content-Length"])
|
||||
body = request.environ["wsgi.input"].read(request_body_size).decode("utf-8")
|
||||
body = json.loads(body)
|
||||
body = self._get_body(headers, request)
|
||||
account_id = body.get("account_id", DEFAULT_ACCOUNT_ID)
|
||||
|
||||
for result in body.get("results", []):
|
||||
@ -193,9 +187,7 @@ class MotoAPIResponse(BaseResponse):
|
||||
) -> TYPE_RESPONSE:
|
||||
from .models import moto_api_backend
|
||||
|
||||
request_body_size = int(headers["Content-Length"])
|
||||
body = request.environ["wsgi.input"].read(request_body_size).decode("utf-8")
|
||||
body = json.loads(body)
|
||||
body = self._get_body(headers, request)
|
||||
account_id = body.get("account_id", DEFAULT_ACCOUNT_ID)
|
||||
region = body.get("region", "us-east-1")
|
||||
|
||||
@ -222,9 +214,7 @@ class MotoAPIResponse(BaseResponse):
|
||||
) -> TYPE_RESPONSE:
|
||||
from .models import moto_api_backend
|
||||
|
||||
request_body_size = int(headers["Content-Length"])
|
||||
body = request.environ["wsgi.input"].read(request_body_size).decode("utf-8")
|
||||
body = json.loads(body)
|
||||
body = self._get_body(headers, request)
|
||||
account_id = body.get("account_id", DEFAULT_ACCOUNT_ID)
|
||||
region = body.get("region", "us-east-1")
|
||||
|
||||
@ -253,9 +243,7 @@ class MotoAPIResponse(BaseResponse):
|
||||
) -> TYPE_RESPONSE:
|
||||
from .models import moto_api_backend
|
||||
|
||||
request_body_size = int(headers["Content-Length"])
|
||||
body = request.environ["wsgi.input"].read(request_body_size).decode("utf-8")
|
||||
body = json.loads(body)
|
||||
body = self._get_body(headers, request)
|
||||
account_id = body.get("account_id", DEFAULT_ACCOUNT_ID)
|
||||
region = body.get("region", "us-east-1")
|
||||
|
||||
@ -266,3 +254,34 @@ class MotoAPIResponse(BaseResponse):
|
||||
region=region,
|
||||
)
|
||||
return 201, {}, ""
|
||||
|
||||
def set_proxy_passthrough(
|
||||
self,
|
||||
request: Any,
|
||||
full_url: str, # pylint: disable=unused-argument
|
||||
headers: Any,
|
||||
) -> TYPE_RESPONSE:
|
||||
from .models import moto_api_backend
|
||||
|
||||
res_headers = {"Content-Type": "application/json"}
|
||||
|
||||
if request.method == "POST":
|
||||
body = self._get_body(headers, request)
|
||||
http_urls = body.get("http_urls", [])
|
||||
https_hosts = body.get("https_hosts", [])
|
||||
moto_api_backend.set_proxy_passthrough(http_urls, https_hosts)
|
||||
if request.method == "DELETE":
|
||||
moto_api_backend.delete_proxy_passthroughs()
|
||||
|
||||
urls, hosts = moto_api_backend.get_proxy_passthrough()
|
||||
resp = {"http_urls": list(urls), "https_hosts": list(hosts)}
|
||||
return 201, res_headers, json.dumps(resp).encode("utf-8")
|
||||
|
||||
def _get_body(self, headers: Any, request: Any) -> Any:
|
||||
if isinstance(request, AWSPreparedRequest):
|
||||
return json.loads(request.body) # type: ignore[arg-type]
|
||||
else:
|
||||
# Werkzeug request
|
||||
request_body_size = int(headers["Content-Length"])
|
||||
body = request.environ["wsgi.input"].read(request_body_size).decode("utf-8")
|
||||
return json.loads(body)
|
||||
|
@ -12,6 +12,7 @@ url_paths = {
|
||||
"{0}/moto-api/reset": response_instance.reset_response,
|
||||
"{0}/moto-api/reset-auth": response_instance.reset_auth_response,
|
||||
"{0}/moto-api/seed": response_instance.seed,
|
||||
"{0}/moto-api/proxy/passthrough": response_instance.set_proxy_passthrough,
|
||||
"{0}/moto-api/static/athena/query-results": response_instance.set_athena_result,
|
||||
"{0}/moto-api/static/ce/cost-and-usage-results": response_instance.set_ce_cost_usage_result,
|
||||
"{0}/moto-api/static/inspector2/findings-results": response_instance.set_inspector2_findings_result,
|
||||
|
@ -1,11 +1,13 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
import re
|
||||
import select
|
||||
import socket
|
||||
import ssl
|
||||
from http.server import BaseHTTPRequestHandler
|
||||
from subprocess import CalledProcessError, check_output
|
||||
from threading import Lock
|
||||
from typing import Any, Dict
|
||||
from typing import Any, Dict, Tuple
|
||||
from urllib.parse import urlparse
|
||||
|
||||
from botocore.awsrequest import AWSPreparedRequest
|
||||
|
||||
@ -14,6 +16,7 @@ from moto.backends import get_backend
|
||||
from moto.core import DEFAULT_ACCOUNT_ID
|
||||
from moto.core.base_backend import BackendDict
|
||||
from moto.core.exceptions import RESTError
|
||||
from moto.moto_api._internal.models import moto_api_backend
|
||||
|
||||
from . import debug, error, info, with_color
|
||||
from .certificate_creator import CertificateCreator
|
||||
@ -39,6 +42,8 @@ class MotoRequestHandler:
|
||||
# We do not match against URL parameters
|
||||
path = path.split("?")[0]
|
||||
backend_name = self.get_backend_for_host(host)
|
||||
if not backend_name:
|
||||
return None
|
||||
backend_dict = get_backend(backend_name)
|
||||
|
||||
# Get an instance of this backend.
|
||||
@ -67,6 +72,8 @@ class MotoRequestHandler:
|
||||
form_data: Dict[str, Any],
|
||||
) -> Any:
|
||||
handler = self.get_handler_for_host(host=host, path=path)
|
||||
if handler is None:
|
||||
return 404, {}, b"AWS Service not recognized or supported"
|
||||
full_url = host + path
|
||||
request = AWSPreparedRequest(
|
||||
method, full_url, headers, body, stream_output=False
|
||||
@ -100,8 +107,13 @@ class ProxyRequestHandler(BaseHTTPRequestHandler):
|
||||
raise
|
||||
|
||||
def do_CONNECT(self) -> None:
|
||||
certpath = self.cert_creator.create(self.path)
|
||||
address = self.path.split(":")
|
||||
port = int(address[1]) or 443 # type: ignore
|
||||
if address[0] in moto_api_backend.proxy_hosts_to_passthrough:
|
||||
self.connect_relay((address[0], port))
|
||||
return
|
||||
|
||||
certpath = self.cert_creator.create(self.path)
|
||||
self.wfile.write(
|
||||
f"{self.protocol_version} 200 Connection Established\r\n".encode("utf-8")
|
||||
)
|
||||
@ -129,6 +141,13 @@ class ProxyRequestHandler(BaseHTTPRequestHandler):
|
||||
|
||||
def do_GET(self) -> None:
|
||||
req = self
|
||||
host, path = self._get_host_and_path(req)
|
||||
|
||||
if f"{host}{path}" in moto_api_backend.proxy_urls_to_passthrough:
|
||||
parsed = urlparse(host)
|
||||
self.passthrough_http((parsed.netloc, 80))
|
||||
return
|
||||
|
||||
req_body = b""
|
||||
if "Content-Length" in req.headers:
|
||||
content_length = int(req.headers["Content-Length"])
|
||||
@ -145,11 +164,6 @@ class ProxyRequestHandler(BaseHTTPRequestHandler):
|
||||
form_data = {}
|
||||
|
||||
req_body = self.decode_request_body(req.headers, req_body) # type: ignore
|
||||
if isinstance(self.connection, ssl.SSLSocket):
|
||||
host = "https://" + req.headers["Host"]
|
||||
else:
|
||||
host = "http://" + req.headers["Host"]
|
||||
path = req.path
|
||||
|
||||
try:
|
||||
info(f"{with_color(33, req.command.upper())} {host}{path}") # noqa
|
||||
@ -207,6 +221,51 @@ class ProxyRequestHandler(BaseHTTPRequestHandler):
|
||||
self.wfile.write(res_body)
|
||||
self.close_connection = True
|
||||
|
||||
def _get_host_and_path(self, req: Any) -> Tuple[str, str]:
|
||||
if isinstance(self.connection, ssl.SSLSocket):
|
||||
host = "https://" + req.headers["Host"]
|
||||
else:
|
||||
host = "http://" + req.headers["Host"]
|
||||
path = req.path
|
||||
if path.startswith(host):
|
||||
path = path[len(host) :]
|
||||
return host, path
|
||||
|
||||
def passthrough_http(self, address: Tuple[str, int]) -> None:
|
||||
s = socket.create_connection(address, timeout=self.timeout)
|
||||
s.send(self.raw_requestline) # type: ignore[attr-defined]
|
||||
for key, val in self.headers.items():
|
||||
s.send(f"{key}: {val}\r\n".encode("utf-8"))
|
||||
s.send(b"\r\n")
|
||||
while True:
|
||||
data = s.recv(1024)
|
||||
if not data:
|
||||
break
|
||||
self.wfile.write(data)
|
||||
|
||||
def connect_relay(self, address: Tuple[str, int]) -> None:
|
||||
try:
|
||||
s = socket.create_connection(address, timeout=self.timeout)
|
||||
except Exception:
|
||||
self.send_error(502)
|
||||
return
|
||||
self.send_response(200, "Connection Established")
|
||||
self.end_headers()
|
||||
|
||||
conns = [self.connection, s]
|
||||
self.close_connection = False
|
||||
while not self.close_connection:
|
||||
rlist, wlist, xlist = select.select(conns, [], conns, self.timeout)
|
||||
if xlist or not rlist:
|
||||
break
|
||||
for r in rlist:
|
||||
other = conns[1] if r is conns[0] else conns[0]
|
||||
data = r.recv(8192)
|
||||
if not data:
|
||||
self.close_connection = True
|
||||
break
|
||||
other.sendall(data)
|
||||
|
||||
def read_chunked_body(self, reader: Any) -> bytes:
|
||||
chunked_body = b""
|
||||
while True:
|
||||
|
127
tests/test_core/test_proxy.py
Normal file
127
tests/test_core/test_proxy.py
Normal file
@ -0,0 +1,127 @@
|
||||
from unittest import SkipTest
|
||||
|
||||
import requests
|
||||
|
||||
from moto import settings
|
||||
|
||||
url = "http://motoapi.amazonaws.com/moto-api/proxy/passthrough"
|
||||
|
||||
|
||||
def test_real_request_errors() -> None:
|
||||
if not settings.is_test_proxy_mode():
|
||||
raise SkipTest("Can only be tested in ProxyMode")
|
||||
|
||||
http_proxy = settings.test_proxy_mode_endpoint()
|
||||
https_proxy = settings.test_proxy_mode_endpoint()
|
||||
proxies = {"http": http_proxy, "https": https_proxy}
|
||||
|
||||
# Delete all to ensure we're starting with a clean slate
|
||||
requests.delete(url, proxies=proxies)
|
||||
|
||||
resp = requests.get("http://httpbin.org/robots.txt", proxies=proxies)
|
||||
assert resp.status_code == 404
|
||||
assert resp.content == b"AWS Service not recognized or supported"
|
||||
|
||||
|
||||
def test_configure_passedthrough_urls() -> None:
|
||||
if not settings.is_test_proxy_mode():
|
||||
raise SkipTest("Can only be tested in ProxyMode")
|
||||
|
||||
http_proxy = settings.test_proxy_mode_endpoint()
|
||||
https_proxy = settings.test_proxy_mode_endpoint()
|
||||
proxies = {"http": http_proxy, "https": https_proxy}
|
||||
|
||||
# Delete all to ensure we're starting with a clean slate
|
||||
requests.delete(url, proxies=proxies)
|
||||
|
||||
target1 = "http://httpbin.org/robots.txt"
|
||||
target2 = "http://othersite.org/"
|
||||
target3 = "https://othersite.org/"
|
||||
resp = requests.post(url, json={"http_urls": [target1]}, proxies=proxies)
|
||||
assert resp.status_code == 201
|
||||
assert resp.json() == {"http_urls": [target1], "https_hosts": []}
|
||||
|
||||
# We can configure multiple URL's
|
||||
resp = requests.post(url, json={"http_urls": [target2]}, proxies=proxies)
|
||||
assert target1 in resp.json()["http_urls"]
|
||||
assert target2 in resp.json()["http_urls"]
|
||||
|
||||
# Duplicate URL's are ignored
|
||||
requests.post(url, json={"http_urls": [target1]}, proxies=proxies)
|
||||
|
||||
# We can retrieve the data
|
||||
resp = requests.get(url, proxies=proxies)
|
||||
assert target1 in resp.json()["http_urls"]
|
||||
assert target2 in resp.json()["http_urls"]
|
||||
assert resp.json()["https_hosts"] == []
|
||||
|
||||
# Set HTTPS HOST for good measure
|
||||
resp = requests.post(url, json={"https_hosts": [target3]}, proxies=proxies)
|
||||
assert target1 in resp.json()["http_urls"]
|
||||
assert target2 in resp.json()["http_urls"]
|
||||
assert resp.json()["https_hosts"] == [target3]
|
||||
|
||||
# We can delete all URL's in one go
|
||||
requests.delete(url, proxies=proxies)
|
||||
|
||||
resp = requests.get(url, proxies=proxies)
|
||||
assert resp.json() == {"http_urls": [], "https_hosts": []}
|
||||
|
||||
|
||||
def test_http_get_request_can_be_passed_through() -> None:
|
||||
if not settings.is_test_proxy_mode():
|
||||
raise SkipTest("Can only be tested in ProxyMode")
|
||||
|
||||
http_proxy = settings.test_proxy_mode_endpoint()
|
||||
https_proxy = settings.test_proxy_mode_endpoint()
|
||||
proxies = {"http": http_proxy, "https": https_proxy}
|
||||
|
||||
# Delete all to ensure we're starting with a clean slate
|
||||
requests.delete(url, proxies=proxies)
|
||||
|
||||
# Configure our URL as the one to passthrough
|
||||
target_url = "http://httpbin.org/robots.txt"
|
||||
requests.post(url, json={"http_urls": [target_url]}, proxies=proxies)
|
||||
|
||||
resp = requests.get("http://httpbin.org/robots.txt", proxies=proxies)
|
||||
assert resp.status_code == 200
|
||||
assert b"/deny" in resp.content
|
||||
|
||||
|
||||
def test_http_post_request_can_be_passed_through() -> None:
|
||||
if not settings.is_test_proxy_mode():
|
||||
raise SkipTest("Can only be tested in ProxyMode")
|
||||
|
||||
http_proxy = settings.test_proxy_mode_endpoint()
|
||||
https_proxy = settings.test_proxy_mode_endpoint()
|
||||
proxies = {"http": http_proxy, "https": https_proxy}
|
||||
|
||||
# Delete all to ensure we're starting with a clean slate
|
||||
requests.delete(url, proxies=proxies)
|
||||
|
||||
# Configure our URL as the one to passthrough
|
||||
target_url = "http://httpbin.org/response-headers?x-moto-test=someval"
|
||||
requests.post(url, json={"http_urls": [target_url]}, proxies=proxies)
|
||||
|
||||
resp = requests.post(target_url, proxies=proxies)
|
||||
assert "x-moto-test" in resp.json()
|
||||
|
||||
|
||||
def test_https_request_can_be_passed_through() -> None:
|
||||
if not settings.is_test_proxy_mode():
|
||||
raise SkipTest("Can only be tested in ProxyMode")
|
||||
|
||||
http_proxy = settings.test_proxy_mode_endpoint()
|
||||
https_proxy = settings.test_proxy_mode_endpoint()
|
||||
proxies = {"http": http_proxy, "https": https_proxy}
|
||||
|
||||
# Delete all to ensure we're starting with a clean slate
|
||||
requests.delete(url, proxies=proxies)
|
||||
|
||||
# Configure our URL as the one to passthrough
|
||||
target_url = "https://httpbin.org/ip"
|
||||
requests.post(url, json={"https_hosts": ["httpbin.org"]}, proxies=proxies)
|
||||
|
||||
resp = requests.get(target_url, proxies=proxies)
|
||||
assert resp.status_code == 200
|
||||
assert "origin" in resp.json()
|
Loading…
Reference in New Issue
Block a user