242 lines
8.0 KiB
Python
242 lines
8.0 KiB
Python
import io
|
|
from urllib.parse import urlparse, parse_qs
|
|
import sure # pylint: disable=unused-import
|
|
|
|
from flask.testing import FlaskClient
|
|
import moto.server as server
|
|
from unittest.mock import patch
|
|
|
|
"""
|
|
Test the different server responses
|
|
"""
|
|
|
|
|
|
class AuthenticatedClient(FlaskClient):
|
|
def open(self, *args, **kwargs):
|
|
kwargs["headers"] = kwargs.get("headers", {})
|
|
kwargs["headers"]["Authorization"] = "Any authorization header"
|
|
kwargs["content_length"] = 0 # Fixes content-length complaints.
|
|
return super(AuthenticatedClient, self).open(*args, **kwargs)
|
|
|
|
|
|
def authenticated_client():
|
|
backend = server.create_backend_app("s3")
|
|
backend.test_client_class = AuthenticatedClient
|
|
return backend.test_client()
|
|
|
|
|
|
def test_s3_server_get():
|
|
test_client = authenticated_client()
|
|
res = test_client.get("/")
|
|
|
|
res.data.should.contain(b"ListAllMyBucketsResult")
|
|
|
|
|
|
def test_s3_server_bucket_create():
|
|
test_client = authenticated_client()
|
|
|
|
res = test_client.put("/", "http://foobaz.localhost:5000/")
|
|
res.status_code.should.equal(200)
|
|
|
|
res = test_client.get("/")
|
|
res.data.should.contain(b"<Name>foobaz</Name>")
|
|
|
|
res = test_client.get("/", "http://foobaz.localhost:5000/")
|
|
res.status_code.should.equal(200)
|
|
res.data.should.contain(b"ListBucketResult")
|
|
|
|
res = test_client.put("/bar", "http://foobaz.localhost:5000/", data="test value")
|
|
res.status_code.should.equal(200)
|
|
assert "ETag" in dict(res.headers)
|
|
|
|
res = test_client.get("/bar", "http://foobaz.localhost:5000/")
|
|
res.status_code.should.equal(200)
|
|
res.data.should.equal(b"test value")
|
|
|
|
|
|
def test_s3_server_ignore_subdomain_for_bucketnames():
|
|
with patch("moto.settings.S3_IGNORE_SUBDOMAIN_BUCKETNAME", True):
|
|
test_client = authenticated_client()
|
|
|
|
res = test_client.put("/mybucket", "http://foobaz.localhost:5000/")
|
|
res.status_code.should.equal(200)
|
|
res.data.should.contain(b"mybucket")
|
|
|
|
|
|
def test_s3_server_bucket_versioning():
|
|
test_client = authenticated_client()
|
|
|
|
# Just enough XML to enable versioning
|
|
body = "<Status>Enabled</Status>"
|
|
res = test_client.put("/?versioning", "http://foobaz.localhost:5000", data=body)
|
|
res.status_code.should.equal(200)
|
|
|
|
|
|
def test_s3_server_post_to_bucket():
|
|
test_client = authenticated_client()
|
|
|
|
res = test_client.put("/", "http://tester.localhost:5000/")
|
|
res.status_code.should.equal(200)
|
|
|
|
test_client.post(
|
|
"/",
|
|
"https://tester.localhost:5000/",
|
|
data={"key": "the-key", "file": "nothing"},
|
|
)
|
|
|
|
res = test_client.get("/the-key", "http://tester.localhost:5000/")
|
|
res.status_code.should.equal(200)
|
|
res.data.should.equal(b"nothing")
|
|
|
|
|
|
def test_s3_server_post_to_bucket_redirect():
|
|
test_client = authenticated_client()
|
|
|
|
res = test_client.put("/", "http://tester.localhost:5000/")
|
|
res.status_code.should.equal(200)
|
|
|
|
redirect_base = "https://redirect.com/success/"
|
|
filecontent = "nothing"
|
|
filename = "test_filename.txt"
|
|
res = test_client.post(
|
|
"/",
|
|
"https://tester.localhost:5000/",
|
|
data={
|
|
"key": "asdf/the-key/${filename}",
|
|
"file": (io.BytesIO(filecontent.encode("utf8")), filename),
|
|
"success_action_redirect": redirect_base,
|
|
},
|
|
)
|
|
real_key = "asdf/the-key/{}".format(filename)
|
|
res.status_code.should.equal(303)
|
|
redirect = res.headers["location"]
|
|
assert redirect.startswith(redirect_base)
|
|
|
|
parts = urlparse(redirect)
|
|
args = parse_qs(parts.query)
|
|
assert args["key"][0] == real_key
|
|
assert args["bucket"][0] == "tester"
|
|
|
|
res = test_client.get("/{}".format(real_key), "http://tester.localhost:5000/")
|
|
res.status_code.should.equal(200)
|
|
res.data.should.equal(filecontent.encode("utf8"))
|
|
|
|
|
|
def test_s3_server_post_without_content_length():
|
|
test_client = authenticated_client()
|
|
|
|
res = test_client.put(
|
|
"/", "http://tester.localhost:5000/", environ_overrides={"CONTENT_LENGTH": ""}
|
|
)
|
|
res.status_code.should.equal(411)
|
|
|
|
res = test_client.post(
|
|
"/", "https://tester.localhost:5000/", environ_overrides={"CONTENT_LENGTH": ""}
|
|
)
|
|
res.status_code.should.equal(411)
|
|
|
|
|
|
def test_s3_server_post_unicode_bucket_key():
|
|
# Make sure that we can deal with non-ascii characters in request URLs (e.g., S3 object names)
|
|
dispatcher = server.DomainDispatcherApplication(server.create_backend_app)
|
|
backend_app = dispatcher.get_application(
|
|
{"HTTP_HOST": "s3.amazonaws.com", "PATH_INFO": "/test-bucket/test-object-てすと"}
|
|
)
|
|
assert backend_app
|
|
backend_app = dispatcher.get_application(
|
|
{
|
|
"HTTP_HOST": "s3.amazonaws.com",
|
|
"PATH_INFO": "/test-bucket/test-object-てすと".encode("utf-8"),
|
|
}
|
|
)
|
|
assert backend_app
|
|
|
|
|
|
def test_s3_server_post_cors():
|
|
"""Test default CORS headers set by flask-cors plugin"""
|
|
test_client = authenticated_client()
|
|
# Create the bucket
|
|
test_client.put("/", "http://tester.localhost:5000/")
|
|
|
|
preflight_headers = {
|
|
"Access-Control-Request-Method": "POST",
|
|
"Access-Control-Request-Headers": "origin, x-requested-with",
|
|
"Origin": "https://localhost:9000",
|
|
}
|
|
|
|
res = test_client.options(
|
|
"/", "http://tester.localhost:5000/", headers=preflight_headers
|
|
)
|
|
assert res.status_code in [200, 204]
|
|
|
|
expected_methods = set(["DELETE", "PATCH", "PUT", "GET", "HEAD", "POST", "OPTIONS"])
|
|
assert (
|
|
set(res.headers["Access-Control-Allow-Methods"].split(", ")) == expected_methods
|
|
)
|
|
|
|
res.headers.should.have.key("Access-Control-Allow-Origin").which.should.equal(
|
|
"https://localhost:9000"
|
|
)
|
|
res.headers.should.have.key("Access-Control-Allow-Headers").which.should.equal(
|
|
"origin, x-requested-with"
|
|
)
|
|
|
|
|
|
def test_s3_server_post_cors_exposed_header():
|
|
"""Test that we can override default CORS headers with custom bucket rules"""
|
|
# github.com/spulec/moto/issues/4220
|
|
|
|
cors_config_payload = """<CORSConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
|
|
<CORSRule>
|
|
<AllowedOrigin>https://example.org</AllowedOrigin>
|
|
<AllowedMethod>HEAD</AllowedMethod>
|
|
<AllowedMethod>GET</AllowedMethod>
|
|
<AllowedMethod>PUT</AllowedMethod>
|
|
<AllowedMethod>POST</AllowedMethod>
|
|
<AllowedMethod>DELETE</AllowedMethod>
|
|
<AllowedHeader>*</AllowedHeader>
|
|
<ExposeHeader>ETag</ExposeHeader>
|
|
<MaxAgeSeconds>3000</MaxAgeSeconds>
|
|
</CORSRule>
|
|
</CORSConfiguration>
|
|
"""
|
|
|
|
test_client = authenticated_client()
|
|
preflight_headers = {
|
|
"Access-Control-Request-Method": "POST",
|
|
"Access-Control-Request-Headers": "origin, x-requested-with",
|
|
"Origin": "https://localhost:9000",
|
|
}
|
|
# Returns 403 on non existing bucket
|
|
preflight_response = test_client.options(
|
|
"/", "http://testcors.localhost:5000/", headers=preflight_headers
|
|
)
|
|
assert preflight_response.status_code == 403
|
|
|
|
# Create the bucket
|
|
test_client.put("/", "http://testcors.localhost:5000/")
|
|
res = test_client.put(
|
|
"/?cors", "http://testcors.localhost:5000", data=cors_config_payload
|
|
)
|
|
assert res.status_code == 200
|
|
|
|
cors_res = test_client.get("/?cors", "http://testcors.localhost:5000")
|
|
assert b"<ExposedHeader>ETag</ExposedHeader>" in cors_res.data
|
|
|
|
# Test OPTIONS bucket response and key response
|
|
for key_name in ("/", "/test"):
|
|
preflight_response = test_client.options(
|
|
key_name, "http://testcors.localhost:5000/", headers=preflight_headers
|
|
)
|
|
assert preflight_response.status_code == 200
|
|
expected_cors_headers = {
|
|
"Access-Control-Allow-Methods": "HEAD, GET, PUT, POST, DELETE",
|
|
"Access-Control-Allow-Origin": "https://example.org",
|
|
"Access-Control-Allow-Headers": "*",
|
|
"Access-Control-Expose-Headers": "ETag",
|
|
"Access-Control-Max-Age": "3000",
|
|
}
|
|
for header_name, header_value in expected_cors_headers.items():
|
|
assert header_name in preflight_response.headers
|
|
assert preflight_response.headers[header_name] == header_value
|