2017-09-16 13:38:40 +00:00
|
|
|
from flask.testing import FlaskClient
|
2023-11-30 15:55:51 +00:00
|
|
|
|
2013-10-28 20:43:25 +00:00
|
|
|
import moto.server as server
|
|
|
|
|
|
|
|
|
2017-09-16 13:38:40 +00:00
|
|
|
class AuthenticatedClient(FlaskClient):
|
|
|
|
def open(self, *args, **kwargs):
|
2019-10-31 15:44:26 +00:00
|
|
|
kwargs["headers"] = kwargs.get("headers", {})
|
|
|
|
kwargs["headers"]["Authorization"] = "Any authorization header"
|
|
|
|
kwargs["content_length"] = 0 # Fixes content-length complaints.
|
2021-12-01 23:06:58 +00:00
|
|
|
return super().open(*args, **kwargs)
|
2017-09-16 13:38:40 +00:00
|
|
|
|
|
|
|
|
|
|
|
def authenticated_client():
|
2024-01-07 12:03:33 +00:00
|
|
|
backend = server.create_backend_app("s3")
|
2017-09-16 13:38:40 +00:00
|
|
|
backend.test_client_class = AuthenticatedClient
|
|
|
|
return backend.test_client()
|
|
|
|
|
|
|
|
|
|
|
|
def test_s3_server_get():
|
|
|
|
test_client = authenticated_client()
|
2013-12-29 01:15:37 +00:00
|
|
|
|
2019-10-31 15:44:26 +00:00
|
|
|
res = test_client.get("/")
|
2013-10-28 20:43:25 +00:00
|
|
|
|
2023-08-04 21:52:10 +00:00
|
|
|
assert b"ListAllMyBucketsResult" in res.data
|
2013-10-28 20:43:25 +00:00
|
|
|
|
|
|
|
|
|
|
|
def test_s3_server_bucket_create():
|
2017-09-16 13:38:40 +00:00
|
|
|
test_client = authenticated_client()
|
2013-12-29 01:15:37 +00:00
|
|
|
|
2019-10-31 15:44:26 +00:00
|
|
|
res = test_client.put("/foobar", "http://localhost:5000")
|
2023-08-04 21:52:10 +00:00
|
|
|
assert res.status_code == 200
|
2013-10-28 20:43:25 +00:00
|
|
|
|
2019-10-31 15:44:26 +00:00
|
|
|
res = test_client.get("/")
|
2023-08-04 21:52:10 +00:00
|
|
|
assert b"<Name>foobar</Name>" in res.data
|
2013-10-28 20:43:25 +00:00
|
|
|
|
2019-10-31 15:44:26 +00:00
|
|
|
res = test_client.get("/foobar", "http://localhost:5000")
|
2023-08-04 21:52:10 +00:00
|
|
|
assert res.status_code == 200
|
|
|
|
assert b"ListBucketResult" in res.data
|
2013-10-28 20:43:25 +00:00
|
|
|
|
2019-10-31 15:44:26 +00:00
|
|
|
res = test_client.put("/foobar2/", "http://localhost:5000")
|
2023-08-04 21:52:10 +00:00
|
|
|
assert res.status_code == 200
|
2016-12-03 23:15:24 +00:00
|
|
|
|
2019-10-31 15:44:26 +00:00
|
|
|
res = test_client.get("/")
|
2023-08-04 21:52:10 +00:00
|
|
|
assert b"<Name>foobar2</Name>" in res.data
|
2016-12-03 23:15:24 +00:00
|
|
|
|
2019-10-31 15:44:26 +00:00
|
|
|
res = test_client.get("/foobar2/", "http://localhost:5000")
|
2023-08-04 21:52:10 +00:00
|
|
|
assert res.status_code == 200
|
|
|
|
assert b"ListBucketResult" in res.data
|
2016-12-03 23:15:24 +00:00
|
|
|
|
2019-10-31 15:44:26 +00:00
|
|
|
res = test_client.get("/missing-bucket", "http://localhost:5000")
|
2023-08-04 21:52:10 +00:00
|
|
|
assert res.status_code == 404
|
2014-03-30 15:50:36 +00:00
|
|
|
|
2019-10-31 15:44:26 +00:00
|
|
|
res = test_client.put("/foobar/bar", "http://localhost:5000", data="test value")
|
2023-08-04 21:52:10 +00:00
|
|
|
assert res.status_code == 200
|
2013-10-28 20:43:25 +00:00
|
|
|
|
2019-10-31 15:44:26 +00:00
|
|
|
res = test_client.get("/foobar/bar", "http://localhost:5000")
|
2023-08-04 21:52:10 +00:00
|
|
|
assert res.status_code == 200
|
|
|
|
assert res.data == b"test value"
|
2013-10-28 20:43:25 +00:00
|
|
|
|
|
|
|
|
|
|
|
def test_s3_server_post_to_bucket():
|
2017-09-16 13:38:40 +00:00
|
|
|
test_client = authenticated_client()
|
2013-12-29 01:15:37 +00:00
|
|
|
|
2019-10-31 15:44:26 +00:00
|
|
|
res = test_client.put("/foobar2", "http://localhost:5000/")
|
2023-08-04 21:52:10 +00:00
|
|
|
assert res.status_code == 200
|
2013-10-28 20:43:25 +00:00
|
|
|
|
2019-10-31 15:44:26 +00:00
|
|
|
test_client.post(
|
|
|
|
"/foobar2",
|
|
|
|
"https://localhost:5000/",
|
|
|
|
data={"key": "the-key", "file": "nothing"},
|
|
|
|
)
|
2013-10-28 20:43:25 +00:00
|
|
|
|
2019-10-31 15:44:26 +00:00
|
|
|
res = test_client.get("/foobar2/the-key", "http://localhost:5000/")
|
2023-08-04 21:52:10 +00:00
|
|
|
assert res.status_code == 200
|
|
|
|
assert res.data == b"nothing"
|
2016-12-03 23:15:24 +00:00
|
|
|
|
|
|
|
|
|
|
|
def test_s3_server_put_ipv6():
|
2017-09-16 13:38:40 +00:00
|
|
|
test_client = authenticated_client()
|
2016-12-03 23:15:24 +00:00
|
|
|
|
2019-10-31 15:44:26 +00:00
|
|
|
res = test_client.put("/foobar2", "http://[::]:5000/")
|
2023-08-04 21:52:10 +00:00
|
|
|
assert res.status_code == 200
|
2016-12-03 23:15:24 +00:00
|
|
|
|
2019-10-31 15:44:26 +00:00
|
|
|
test_client.post(
|
|
|
|
"/foobar2", "https://[::]:5000/", data={"key": "the-key", "file": "nothing"}
|
|
|
|
)
|
2016-12-03 23:15:24 +00:00
|
|
|
|
2019-10-31 15:44:26 +00:00
|
|
|
res = test_client.get("/foobar2/the-key", "http://[::]:5000/")
|
2023-08-04 21:52:10 +00:00
|
|
|
assert res.status_code == 200
|
|
|
|
assert res.data == b"nothing"
|
2016-12-03 23:15:24 +00:00
|
|
|
|
|
|
|
|
|
|
|
def test_s3_server_put_ipv4():
|
2017-09-16 13:38:40 +00:00
|
|
|
test_client = authenticated_client()
|
2016-12-03 23:15:24 +00:00
|
|
|
|
2019-10-31 15:44:26 +00:00
|
|
|
res = test_client.put("/foobar2", "http://127.0.0.1:5000/")
|
2023-08-04 21:52:10 +00:00
|
|
|
assert res.status_code == 200
|
2016-12-03 23:15:24 +00:00
|
|
|
|
2019-10-31 15:44:26 +00:00
|
|
|
test_client.post(
|
|
|
|
"/foobar2",
|
|
|
|
"https://127.0.0.1:5000/",
|
|
|
|
data={"key": "the-key", "file": "nothing"},
|
|
|
|
)
|
2016-12-03 23:15:24 +00:00
|
|
|
|
2019-10-31 15:44:26 +00:00
|
|
|
res = test_client.get("/foobar2/the-key", "http://127.0.0.1:5000/")
|
2023-08-04 21:52:10 +00:00
|
|
|
assert res.status_code == 200
|
|
|
|
assert res.data == b"nothing"
|