Feature: Allow custom endpoints for S3 (#4562)

This commit is contained in:
Bert Blommers 2021-11-17 20:02:14 -01:00 committed by GitHub
parent 8b5e926ec1
commit a912fc4cac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 160 additions and 7 deletions

View File

@ -31,6 +31,7 @@ import shlex
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
# ones.
extensions = [
'sphinx.ext.autodoc',
'sphinx.ext.autosectionlabel'
]

View File

@ -12,7 +12,7 @@
s3
==
.. autoclass:: moto.s3.models.S3Backend
|start-h3| Example usage |end-h3|

View File

@ -34,6 +34,7 @@ from .utils import (
convert_flask_to_responses_response,
)
ACCOUNT_ID = os.environ.get("MOTO_ACCOUNT_ID", "123456789012")
@ -702,7 +703,7 @@ class BaseBackend:
A dictionary of the urls to be mocked with this service and the handlers
that should be called in their place
"""
url_bases = self._url_module.url_bases
url_bases = self.url_bases
unformatted_paths = self._url_module.url_paths
urls = {}

View File

@ -16,6 +16,7 @@ import time
import uuid
from bisect import insort
from importlib import reload
from moto.core import (
ACCOUNT_ID,
BaseBackend,
@ -1321,11 +1322,37 @@ class FakeBucket(CloudFormationModel):
class S3Backend(BaseBackend, CloudWatchMetricProvider):
"""
Moto implementation for S3.
Custom S3 endpoints are supported, if you are using a S3-compatible storage solution like Ceph.
Example usage:
.. sourcecode:: python
os.environ["MOTO_S3_CUSTOM_ENDPOINTS"] = "http://custom.internal.endpoint,http://custom.other.endpoint"
@mock_s3
def test_my_custom_endpoint():
boto3.client("s3", endpoint_url="http://custom.internal.endpoint")
...
Note that this only works if the environment variable is set **before** the mock is initialized.
"""
def __init__(self):
self.buckets = {}
self.account_public_access_block = None
self.tagger = TaggingService()
@property
def _url_module(self):
# The urls-property can be different depending on env variables
# Force a reload, to retrieve the correct set of URLs
import moto.s3.urls as backend_urls_module
reload(backend_urls_module)
return backend_urls_module
@staticmethod
def default_vpc_endpoint_service(service_region, zones):
"""List of dicts representing default VPC endpoints for this service."""

View File

@ -17,11 +17,11 @@ from urllib.parse import (
import xmltodict
from moto import settings
from moto.packages.httpretty.core import HTTPrettyRequest
from moto.core.responses import _TemplateEnvironmentMixin, ActionAuthenticatorMixin
from moto.core.utils import path_url
from moto.core import ACCOUNT_ID
from moto.settings import S3_IGNORE_SUBDOMAIN_BUCKETNAME
from moto.s3bucket_path.utils import (
bucket_name_from_url as bucketpath_bucket_name_from_url,
@ -188,12 +188,21 @@ class ResponseObject(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
return template.render(buckets=all_buckets)
def subdomain_based_buckets(self, request):
if S3_IGNORE_SUBDOMAIN_BUCKETNAME:
if settings.S3_IGNORE_SUBDOMAIN_BUCKETNAME:
return False
host = request.headers.get("host", request.headers.get("Host"))
if not host:
host = urlparse(request.url).netloc
custom_endpoints = settings.get_s3_custom_endpoints()
if (
host
and custom_endpoints
and any([host in endpoint for endpoint in custom_endpoints])
):
# Default to path-based buckets for S3-compatible SDKs (Ceph, DigitalOcean Spaces, etc)
return False
if (
not host
or host.startswith("localhost")

View File

@ -1,3 +1,5 @@
from moto import settings
from .responses import S3ResponseInstance
url_bases = [
@ -5,6 +7,8 @@ url_bases = [
r"https?://(?P<bucket_name>[a-zA-Z0-9\-_.]*)\.?s3(.*)\.amazonaws.com",
]
url_bases.extend(settings.get_s3_custom_endpoints())
url_paths = {
# subdomain bucket
"{0}/$": S3ResponseInstance.bucket_response,

View File

@ -24,6 +24,13 @@ def get_sf_execution_history_type():
return os.environ.get("SF_EXECUTION_HISTORY_TYPE", "SUCCESS")
def get_s3_custom_endpoints():
endpoints = os.environ.get("MOTO_S3_CUSTOM_ENDPOINTS")
if endpoints:
return endpoints.split(",")
return []
S3_UPLOAD_PART_MIN_SIZE = 5242880

View File

@ -34,11 +34,21 @@ def get_moto_implementation(service_name):
return backends[0], mock_name
def get_module_name(o):
klass = o.__class__
module = klass.__module__
if module == 'builtins':
return klass.__qualname__ # avoid outputs like 'builtins.str'
return module + '.' + klass.__qualname__
def calculate_extended_implementation_coverage():
service_names = Session().get_available_services()
coverage = {}
for service_name in service_names:
moto_client, mock_name = get_moto_implementation(service_name)
if not moto_client:
continue
real_client = boto3.client(service_name, region_name="us-east-1")
implemented = dict()
not_implemented = []
@ -54,6 +64,7 @@ def calculate_extended_implementation_coverage():
coverage[service_name] = {
"docs": moto_client.__doc__,
"module_name": get_module_name(moto_client),
"name": mock_name,
"implemented": implemented,
"not_implemented": not_implemented,
@ -201,8 +212,10 @@ def write_implementation_coverage_to_docs(coverage):
file.write(("=" * len(title)) + "\n")
file.write("\n")
file.write(coverage[service_name].get("docs") or "")
file.write("\n\n")
if coverage[service_name]["docs"]:
# Only show auto-generated documentation if it exists
file.write(".. autoclass:: " + coverage[service_name].get("module_name"))
file.write("\n\n")
file.write("|start-h3| Example usage |end-h3|\n\n")
file.write(f""".. sourcecode:: python

View File

@ -0,0 +1,91 @@
import boto3
import sure # noqa # pylint: disable=unused-import
import os
import pytest
from moto import mock_s3, settings
from unittest import SkipTest
from unittest.mock import patch
DEFAULT_REGION_NAME = "us-east-1"
CUSTOM_ENDPOINT = "https://s3.local.some-test-domain.de"
CUSTOM_ENDPOINT_2 = "https://caf-o.s3-ext.jc.rl.ac.uk"
@pytest.mark.parametrize("url", [CUSTOM_ENDPOINT, CUSTOM_ENDPOINT_2])
def test_create_and_list_buckets(url):
if settings.TEST_SERVER_MODE:
raise SkipTest("Unable to set ENV VAR in ServerMode")
# Have to inline this, as the URL-param is not available as a context decorator
with patch.dict(os.environ, {"MOTO_S3_CUSTOM_ENDPOINTS": url}):
# Mock needs to be started after the environment variable is patched in
with mock_s3():
bucket = "mybucket"
conn = boto3.resource("s3", endpoint_url=url)
conn.create_bucket(Bucket=bucket)
s3 = boto3.client("s3", endpoint_url=url)
all_buckets = s3.list_buckets()["Buckets"]
[b["Name"] for b in all_buckets].should.contain(bucket)
@pytest.mark.parametrize("url", [CUSTOM_ENDPOINT, CUSTOM_ENDPOINT_2])
def test_create_and_list_buckets_with_multiple_supported_endpoints(url):
if settings.TEST_SERVER_MODE:
raise SkipTest("Unable to set ENV VAR in ServerMode")
# Have to inline this, as the URL-param is not available as a context decorator
with patch.dict(
os.environ,
{"MOTO_S3_CUSTOM_ENDPOINTS": f"{CUSTOM_ENDPOINT},{CUSTOM_ENDPOINT_2}"},
):
# Mock needs to be started after the environment variable is patched in
with mock_s3():
bucket = "mybucket"
conn = boto3.resource("s3", endpoint_url=url)
conn.create_bucket(Bucket=bucket)
s3 = boto3.client("s3", endpoint_url=url)
all_buckets = s3.list_buckets()["Buckets"]
[b["Name"] for b in all_buckets].should.contain(bucket)
@pytest.mark.parametrize("url", [CUSTOM_ENDPOINT, CUSTOM_ENDPOINT_2])
@mock_s3
def test_put_and_get_object(url):
if settings.TEST_SERVER_MODE:
raise SkipTest("Unable to set ENV VAR in ServerMode")
with patch.dict(os.environ, {"MOTO_S3_CUSTOM_ENDPOINTS": url}):
with mock_s3():
bucket = "mybucket"
key = "file.txt"
contents = "file contents"
conn = boto3.resource("s3", endpoint_url=url)
conn.create_bucket(Bucket=bucket)
s3 = boto3.client("s3", endpoint_url=url)
s3.put_object(Bucket=bucket, Key=key, Body=contents)
body = conn.Object(bucket, key).get()["Body"].read().decode()
body.should.equal(contents)
@pytest.mark.parametrize("url", [CUSTOM_ENDPOINT, CUSTOM_ENDPOINT_2])
@mock_s3
def test_put_and_list_objects(url):
if settings.TEST_SERVER_MODE:
raise SkipTest("Unable to set ENV VAR in ServerMode")
with patch.dict(os.environ, {"MOTO_S3_CUSTOM_ENDPOINTS": url}):
with mock_s3():
bucket = "mybucket"
s3 = boto3.client("s3", endpoint_url=url)
s3.create_bucket(Bucket=bucket)
s3.put_object(Bucket=bucket, Key="one", Body=b"1")
s3.put_object(Bucket=bucket, Key="two", Body=b"22")
s3.put_object(Bucket=bucket, Key="three", Body=b"333")
contents = s3.list_objects(Bucket=bucket)["Contents"]
contents.should.have.length_of(3)
[c["Key"] for c in contents].should.contain("two")

View File

@ -55,7 +55,7 @@ def test_s3_server_bucket_create():
def test_s3_server_ignore_subdomain_for_bucketnames():
with patch("moto.s3.responses.S3_IGNORE_SUBDOMAIN_BUCKETNAME", True):
with patch("moto.settings.S3_IGNORE_SUBDOMAIN_BUCKETNAME", True):
test_client = authenticated_client()
res = test_client.put("/mybucket", "http://foobaz.localhost:5000/")