Feature: Allow requests to passthrough to AWS (#7200)

This commit is contained in:
Bert Blommers 2024-01-13 18:38:05 +00:00
parent 0acf4ff847
commit 168b869350
10 changed files with 292 additions and 33 deletions

View File

@ -12,9 +12,24 @@ If you are using the decorators, some options are configurable within the decora
@mock_aws(config={
"batch": {"use_docker": True},
"lambda": {"use_docker": True}
"lambda": {"use_docker": True},
"core": {
"mock_credentials": True,
"passthrough": {
"urls": ["s3.amazonaws.com/bucket*"],
"services": ["dynamodb"]
}
}
})
By default, Batch and AWSLambda will spin up a Docker image to execute the provided scripts and functions.
If you configure `use_docker: False` for either of these services, the scripts and functions will not be executed, and Moto will assume a successful invocation.
Configure `mock_credentials: False` and `passthrough` if you want to only mock some services, but allow other requests to connect to AWS.
You can either passthrough all requests to a specific service, or all URL's that match a specific pattern.
.. toctree::
:maxdepth: 1

View File

@ -1,6 +1,7 @@
from typing import TYPE_CHECKING, Callable, Dict, Optional, TypeVar, Union, overload
from typing import TYPE_CHECKING, Callable, Optional, TypeVar, Union, overload
from moto import settings
from moto.core.config import DefaultConfig
from moto.core.models import MockAWS, ProxyModeMockAWS, ServerModeMockAWS
if TYPE_CHECKING:
@ -17,15 +18,13 @@ def mock_aws(func: "Callable[P, T]") -> "Callable[P, T]":
@overload
def mock_aws(
func: None = None, config: Optional[Dict[str, Dict[str, bool]]] = None
) -> "MockAWS":
def mock_aws(func: None = None, config: Optional[DefaultConfig] = None) -> "MockAWS":
...
def mock_aws(
func: "Optional[Callable[P, T]]" = None,
config: Optional[Dict[str, Dict[str, bool]]] = None,
config: Optional[DefaultConfig] = None,
) -> Union["MockAWS", "Callable[P, T]"]:
clss = (
ServerModeMockAWS

View File

@ -1,6 +1,6 @@
import importlib
import os
from typing import TYPE_CHECKING, Iterable, Union, overload
from typing import TYPE_CHECKING, Iterable, Optional, Union, overload
import moto
@ -155,6 +155,15 @@ def list_of_moto_modules() -> Iterable[str]:
yield backend
def get_service_from_url(url: str) -> Optional[str]:
from moto.backend_index import backend_url_patterns
for service, pattern in backend_url_patterns:
if pattern.match(url):
return service
return None
# There's a similar Union that we could import from boto3-stubs, but it wouldn't have
# moto's custom service backends
SERVICE_NAMES = Union[

View File

@ -9,6 +9,7 @@ import moto.backend_index as backend_index
from moto import settings
from moto.core.base_backend import BackendDict
from moto.core.common_types import TYPE_RESPONSE
from moto.core.config import passthrough_service, passthrough_url
class MockRawResponse(BytesIO):
@ -70,9 +71,15 @@ class BotocoreStubber:
clean_url = f"{x.scheme}://{host}{x.path}"
if passthrough_url(clean_url):
return None
for service, pattern in backend_index.backend_url_patterns:
if pattern.match(clean_url):
if passthrough_service(service):
return None
import moto.backends as backends
from moto.core import DEFAULT_ACCOUNT_ID
from moto.core.exceptions import HTTPException

49
moto/core/config.py Normal file
View File

@ -0,0 +1,49 @@
import re
from typing import List, TypedDict
class _docker_config(TypedDict, total=False):
use_docker: bool
class _passthrough_config(TypedDict, total=False):
services: List[str]
urls: List[str]
class _core_config(TypedDict, total=False):
mock_credentials: bool
passthrough: _passthrough_config
DefaultConfig = TypedDict(
"DefaultConfig",
{"batch": _docker_config, "core": _core_config, "lambda": _docker_config},
total=False,
)
default_user_config: DefaultConfig = {
"batch": {"use_docker": True},
"lambda": {"use_docker": True},
"core": {"mock_credentials": True, "passthrough": {"urls": [], "services": []}},
}
def passthrough_service(service: str) -> bool:
passthrough_services = (
default_user_config.get("core", {}).get("passthrough", {}).get("services", [])
)
return service in passthrough_services
def passthrough_url(clean_url: str) -> bool:
passthrough_urls = (
default_user_config.get("core", {}).get("passthrough", {}).get("urls", [])
)
return any([re.match(url, clean_url) for url in passthrough_urls])
def mock_credentials() -> bool:
return (
default_user_config.get("core", {}).get("mock_credentials", True) is not False
)

View File

@ -7,6 +7,8 @@ from urllib.parse import urlparse
import responses
from werkzeug.wrappers import Request
from moto.backends import get_service_from_url
from moto.core.config import passthrough_service, passthrough_url
from moto.core.versions import is_responses_0_17_x
from .responses import TYPE_RESPONSE
@ -70,6 +72,19 @@ class CallbackResponse(responses.CallbackResponse):
decode_content=False,
)
def matches(self, request: "responses.PreparedRequest") -> Tuple[bool, str]:
if request.method != self.method:
return False, "Method does not match"
if not self._url_matches(self.url, str(request.url)):
return False, "URL does not match"
service = get_service_from_url(request.url) # type: ignore
if (service and passthrough_service(service)) or passthrough_url(request.url): # type: ignore
return False, "URL does not match"
return super().matches(request)
def _url_matches(
self, url: Any, other: Any, match_querystring: bool = False
) -> bool:

View File

@ -28,6 +28,7 @@ import moto.backend_index as backend_index
from moto import settings
from .botocore_stubber import BotocoreStubber
from .config import DefaultConfig, default_user_config, mock_credentials
from .custom_responses_mock import (
CallbackResponse,
get_response_mock,
@ -51,12 +52,12 @@ class MockAWS(ContextManager["MockAWS"]):
mocks_active = False
mock_init_lock = Lock()
def __init__(self, config: Optional[Dict[str, Any]] = None) -> None:
self.FAKE_KEYS = {
def __init__(self, config: Optional[DefaultConfig] = None) -> None:
self._fake_creds = {
"AWS_ACCESS_KEY_ID": "FOOBARKEY",
"AWS_SECRET_ACCESS_KEY": "FOOBARSECRET",
}
self.ORIG_KEYS: Dict[str, Optional[str]] = {}
self._orig_creds: Dict[str, Optional[str]] = {}
self.default_session_mock = patch("boto3.DEFAULT_SESSION", None)
current_user_config = default_user_config.copy()
current_user_config.update(config or {})
@ -78,10 +79,12 @@ class MockAWS(ContextManager["MockAWS"]):
def start(self, reset: bool = True) -> None:
with MockAWS.mock_init_lock:
if not self.__class__.mocks_active:
self.user_config_mock.start()
if mock_credentials():
self.mock_env_variables()
self.__class__.mocks_active = True
if not self.__class__.mocks_active:
self.default_session_mock.start()
self.__class__.mocks_active = True
self.__class__.nested_count += 1
@ -95,10 +98,13 @@ class MockAWS(ContextManager["MockAWS"]):
if self.__class__.nested_count < 0:
raise RuntimeError("Called stop() before start().")
if mock_credentials():
self.unmock_env_variables()
if self.__class__.nested_count == 0:
if self.__class__.mocks_active:
self.default_session_mock.stop()
self.unmock_env_variables()
self.user_config_mock.stop()
self.__class__.mocks_active = False
self.disable_patching(remove_data)
@ -187,17 +193,12 @@ class MockAWS(ContextManager["MockAWS"]):
def mock_env_variables(self) -> None:
# "Mock" the AWS credentials as they can't be mocked in Botocore currently
# self.env_variables_mocks = mock.patch.dict(os.environ, FAKE_KEYS)
# self.env_variables_mocks.start()
for k, v in self.FAKE_KEYS.items():
self.ORIG_KEYS[k] = os.environ.get(k, None)
for k, v in self._fake_creds.items():
self._orig_creds[k] = os.environ.get(k, None)
os.environ[k] = v
def unmock_env_variables(self) -> None:
# This doesn't work in Python2 - for some reason, unmocking clears the entire os.environ dict
# Obviously bad user experience, and also breaks pytest - as it uses PYTEST_CURRENT_TEST as an env var
# self.env_variables_mocks.stop()
for k, v in self.ORIG_KEYS.items():
for k, v in self._orig_creds.items():
if v:
os.environ[k] = v
else:
@ -214,7 +215,6 @@ class MockAWS(ContextManager["MockAWS"]):
if reset:
self.reset()
responses_mock.start()
self.user_config_mock.start()
for method in RESPONSES_METHODS:
for _, pattern in backend_index.backend_url_patterns:
@ -239,11 +239,7 @@ class MockAWS(ContextManager["MockAWS"]):
self.reset()
reset_model_data()
try:
responses_mock.stop()
except RuntimeError:
pass
self.user_config_mock.stop()
responses_mock.stop()
def get_direct_methods_of(klass: object) -> Set[str]:
@ -272,8 +268,6 @@ BOTOCORE_HTTP_METHODS = ["GET", "DELETE", "HEAD", "OPTIONS", "PATCH", "POST", "P
botocore_stubber = BotocoreStubber()
BUILTIN_HANDLERS.append(("before-send", botocore_stubber))
default_user_config = {"batch": {"use_docker": True}, "lambda": {"use_docker": True}}
def patch_client(client: botocore.client.BaseClient) -> None:
"""

View File

@ -0,0 +1,7 @@
from moto.backends import get_service_from_url
def test_get_service_from_url() -> None:
assert get_service_from_url("https://s3.amazonaws.com") == "s3"
assert get_service_from_url("https://bucket.s3.amazonaws.com") == "s3"
assert get_service_from_url("https://unknown.com") is None

View File

@ -1,4 +1,5 @@
import os
from unittest.mock import patch
from moto import mock_aws
@ -7,8 +8,25 @@ KEY = "AWS_ACCESS_KEY_ID"
def test_aws_keys_are_patched() -> None:
with mock_aws():
patched_value = os.environ[KEY]
assert patched_value == "FOOBARKEY"
assert os.environ[KEY] == "FOOBARKEY"
def test_aws_keys_are_not_patched_when_user_configured() -> None:
with patch.dict(os.environ, {"AWS_ACCESS_KEY_ID": "diff_value"}):
# Sanity check
assert os.environ["AWS_ACCESS_KEY_ID"] == "diff_value"
# Moto will not mock the credentials, and we see the 'original' value
with mock_aws(config={"core": {"mock_credentials": False}}):
assert os.environ[KEY] == "diff_value"
# Nested mocks are possible
# For the duration of the inner mock, Moto will patch the credentials
with mock_aws(config={"core": {"mock_credentials": True}}):
assert os.environ[KEY] == "FOOBARKEY"
# The moment the inner patch ends, we're back to the 'original' value
assert os.environ[KEY] == "diff_value"
def test_aws_keys_can_be_none() -> None:
@ -25,8 +43,7 @@ def test_aws_keys_can_be_none() -> None:
try:
# Verify that the os.environ[KEY] is patched
with mock_aws():
patched_value = os.environ[KEY]
assert patched_value == "FOOBARKEY"
assert os.environ[KEY] == "FOOBARKEY"
# Verify that the os.environ[KEY] is unpatched, and reverts to None
assert os.environ.get(KEY) is None
finally:

View File

@ -0,0 +1,147 @@
import os
from unittest import SkipTest
from unittest.mock import patch
import boto3
import pytest
import requests
from botocore.exceptions import ClientError
from moto import mock_aws, settings
def test_passthrough_calls_for_entire_service() -> None:
if not settings.TEST_DECORATOR_MODE:
raise SkipTest("Can only test config when using decorators")
# Still mock the credentials ourselves, we don't want to reach out to AWS for real
with patch.dict(
os.environ, {"AWS_ACCESS_KEY_ID": "a", "AWS_SECRET_ACCESS_KEY": "b"}
):
list_buckets_url = "https://s3.amazonaws.com/"
# All requests to S3 are passed through
with mock_aws(
config={
"core": {"mock_credentials": False, "passthrough": {"services": ["s3"]}}
}
):
s3 = boto3.client("s3", "us-east-1")
with pytest.raises(ClientError) as exc:
s3.list_buckets()
assert exc.value.response["Error"]["Code"] == "InvalidAccessKeyId"
resp = _aws_request(list_buckets_url)
assert resp.status_code == 403
# Calls to SQS are mocked normally
sqs = boto3.client("sqs", "us-east-1")
sqs.list_queues()
# Sanity check that the passthrough does not persist
with mock_aws():
s3 = boto3.client("s3", "us-east-1")
assert s3.list_buckets()["Buckets"] == []
resp = _aws_request(list_buckets_url)
assert resp.status_code == 200
assert b"<Buckets></Buckets>" in resp.content
def test_passthrough_calls_for_specific_url() -> None:
if not settings.TEST_DECORATOR_MODE:
raise SkipTest("Can only test config when using decorators")
# Still mock the credentials ourselves, we don't want to reach out to AWS for real
with patch.dict(
os.environ, {"AWS_ACCESS_KEY_ID": "a", "AWS_SECRET_ACCESS_KEY": "b"}
):
list_buckets_url = "https://s3.amazonaws.com/"
# All requests to these URL's are passed through
with mock_aws(
config={
"core": {
"mock_credentials": False,
"passthrough": {"urls": ["https://realbucket.s3.amazonaws.com/"]},
}
}
):
s3 = boto3.client("s3", "us-east-1")
with pytest.raises(ClientError) as exc:
s3.create_bucket(Bucket="realbucket")
assert exc.value.response["Error"]["Code"] == "InvalidAccessKeyId"
# List buckets works
assert _aws_request(list_buckets_url).status_code == 200
assert s3.list_buckets()["Buckets"] == []
# Creating different buckets works
s3.create_bucket(Bucket="diff")
# Manual requests are also not allowed
assert (
_aws_request("https://realbucket.s3.amazonaws.com/").status_code == 403
)
def test_passthrough_calls_for_wildcard_urls() -> None:
if not settings.TEST_DECORATOR_MODE:
raise SkipTest("Can only test config when using decorators")
# Still mock the credentials ourselves, we don't want to reach out to AWS for real
with patch.dict(
os.environ, {"AWS_ACCESS_KEY_ID": "a", "AWS_SECRET_ACCESS_KEY": "b"}
):
# All requests to these URL's are passed through
with mock_aws(
config={
"core": {
"mock_credentials": False,
"passthrough": {
"urls": [
"https://companyname_*.s3.amazonaws.com/",
"https://s3.amazonaws.com/companyname_*",
]
},
}
}
):
s3 = boto3.client("s3", "us-east-1")
with pytest.raises(ClientError) as exc:
s3.create_bucket(Bucket="companyname_prod")
assert exc.value.response["Error"]["Code"] == "InvalidAccessKeyId"
# Creating different buckets works
s3.create_bucket(Bucket="diffcompany_prod")
# Manual requests are also not allowed
assert (
_aws_request("https://s3.amazonaws.com/companyname_prod").status_code
== 403
)
def test_passthrough__using_unsupported_service() -> None:
if not settings.TEST_DECORATOR_MODE:
raise SkipTest("Can only test config when using decorators")
with patch.dict(
os.environ, {"AWS_ACCESS_KEY_ID": "a", "AWS_SECRET_ACCESS_KEY": "b"}
):
# Requests to unsupported services still throw a NotYetImplemented
with mock_aws(
config={
"core": {
"mock_credentials": False,
"passthrough": {"services": ["s3"]},
}
}
):
b2bi = boto3.client("b2bi", "us-east-1")
with pytest.raises(ClientError) as exc:
b2bi.list_transformers()
assert "Not yet implemented" in str(exc.value)
def _aws_request(url: str) -> requests.Response:
creds = b"AWS4-HMAC-SHA256 Credential=a/20240107/us-east-1/s3/aws4_request, Signature=sig"
headers = {"Authorization": creds, "X-Amz-Content-SHA256": b"UNSIGNED-PAYLOAD"}
return requests.get(url, headers=headers)