2021-11-17 21:02:14 +00:00
import os
2023-08-07 16:48:48 +00:00
from unittest import SkipTest
from unittest.mock import patch
import boto3
2021-11-17 21:02:14 +00:00
import pytest
from moto import mock_s3, settings
CUSTOM_ENDPOINT = "https://s3.local.some-test-domain.de"
CUSTOM_ENDPOINT_2 = "https://caf-o.s3-ext.jc.rl.ac.uk"
@pytest.mark.parametrize("url", [CUSTOM_ENDPOINT, CUSTOM_ENDPOINT_2])
def test_create_and_list_buckets(url):
if settings.TEST_SERVER_MODE:
raise SkipTest("Unable to set ENV VAR in ServerMode")
# Have to inline this, as the URL-param is not available as a context decorator
with patch.dict(os.environ, {"MOTO_S3_CUSTOM_ENDPOINTS": url}):
# Mock needs to be started after the environment variable is patched in
with mock_s3():
bucket = "mybucket"
conn = boto3.resource("s3", endpoint_url=url)
2023-08-07 16:48:48 +00:00
s3_client = boto3.client("s3", endpoint_url=url)
all_buckets = s3_client.list_buckets()["Buckets"]
assert bucket in [b["Name"] for b in all_buckets]
2021-11-17 21:02:14 +00:00
@pytest.mark.parametrize("url", [CUSTOM_ENDPOINT, CUSTOM_ENDPOINT_2])
def test_create_and_list_buckets_with_multiple_supported_endpoints(url):
if settings.TEST_SERVER_MODE:
raise SkipTest("Unable to set ENV VAR in ServerMode")
# Have to inline this, as the URL-param is not available as a context decorator
with patch.dict(
# Mock needs to be started after the environment variable is patched in
with mock_s3():
bucket = "mybucket"
conn = boto3.resource("s3", endpoint_url=url)
2023-08-07 16:48:48 +00:00
s3_client = boto3.client("s3", endpoint_url=url)
all_buckets = s3_client.list_buckets()["Buckets"]
assert bucket in [b["Name"] for b in all_buckets]
2021-11-17 21:02:14 +00:00
@pytest.mark.parametrize("url", [CUSTOM_ENDPOINT, CUSTOM_ENDPOINT_2])
def test_put_and_get_object(url):
if settings.TEST_SERVER_MODE:
raise SkipTest("Unable to set ENV VAR in ServerMode")
with patch.dict(os.environ, {"MOTO_S3_CUSTOM_ENDPOINTS": url}):
with mock_s3():
bucket = "mybucket"
key = "file.txt"
contents = "file contents"
conn = boto3.resource("s3", endpoint_url=url)
2023-08-07 16:48:48 +00:00
s3_client = boto3.client("s3", endpoint_url=url)
s3_client.put_object(Bucket=bucket, Key=key, Body=contents)
2021-11-17 21:02:14 +00:00
body = conn.Object(bucket, key).get()["Body"].read().decode()
2023-08-07 16:48:48 +00:00
assert body == contents
2021-11-17 21:02:14 +00:00
@pytest.mark.parametrize("url", [CUSTOM_ENDPOINT, CUSTOM_ENDPOINT_2])
def test_put_and_list_objects(url):
if settings.TEST_SERVER_MODE:
raise SkipTest("Unable to set ENV VAR in ServerMode")
with patch.dict(os.environ, {"MOTO_S3_CUSTOM_ENDPOINTS": url}):
with mock_s3():
bucket = "mybucket"
2023-08-07 16:48:48 +00:00
s3_client = boto3.client("s3", endpoint_url=url)
s3_client.put_object(Bucket=bucket, Key="one", Body=b"1")
s3_client.put_object(Bucket=bucket, Key="two", Body=b"22")
s3_client.put_object(Bucket=bucket, Key="three", Body=b"333")
2021-11-17 21:02:14 +00:00
2023-08-07 16:48:48 +00:00
contents = s3_client.list_objects(Bucket=bucket)["Contents"]
assert len(contents) == 3
assert "two" in [c["Key"] for c in contents]