import io from urllib.parse import urlparse, parse_qs import sure # pylint: disable=unused-import from flask.testing import FlaskClient import moto.server as server from unittest.mock import patch """ Test the different server responses """ class AuthenticatedClient(FlaskClient): def open(self, *args, **kwargs): kwargs["headers"] = kwargs.get("headers", {}) kwargs["headers"]["Authorization"] = "Any authorization header" kwargs["content_length"] = 0 # Fixes content-length complaints. return super().open(*args, **kwargs) def authenticated_client(): backend = server.create_backend_app("s3") backend.test_client_class = AuthenticatedClient return backend.test_client() def test_s3_server_get(): test_client = authenticated_client() res = test_client.get("/") res.data.should.contain(b"ListAllMyBucketsResult") def test_s3_server_bucket_create(): test_client = authenticated_client() res = test_client.put("/", "http://foobaz.localhost:5000/") res.status_code.should.equal(200) res = test_client.get("/") res.data.should.contain(b"foobaz") res = test_client.get("/", "http://foobaz.localhost:5000/") res.status_code.should.equal(200) res.data.should.contain(b"ListBucketResult") res = test_client.put("/bar", "http://foobaz.localhost:5000/", data="test value") res.status_code.should.equal(200) assert "ETag" in dict(res.headers) res = test_client.get("/bar", "http://foobaz.localhost:5000/") res.status_code.should.equal(200) res.data.should.equal(b"test value") def test_s3_server_ignore_subdomain_for_bucketnames(): with patch("moto.settings.S3_IGNORE_SUBDOMAIN_BUCKETNAME", True): test_client = authenticated_client() res = test_client.put("/mybucket", "http://foobaz.localhost:5000/") res.status_code.should.equal(200) res.data.should.contain(b"mybucket") def test_s3_server_bucket_versioning(): test_client = authenticated_client() # Just enough XML to enable versioning body = "Enabled" res = test_client.put("/?versioning", "http://foobaz.localhost:5000", data=body) res.status_code.should.equal(200) def test_s3_server_post_to_bucket(): test_client = authenticated_client() res = test_client.put("/", "http://tester.localhost:5000/") res.status_code.should.equal(200) test_client.post( "/", "https://tester.localhost:5000/", data={"key": "the-key", "file": "nothing"}, ) res = test_client.get("/the-key", "http://tester.localhost:5000/") res.status_code.should.equal(200) res.data.should.equal(b"nothing") def test_s3_server_post_to_bucket_redirect(): test_client = authenticated_client() res = test_client.put("/", "http://tester.localhost:5000/") res.status_code.should.equal(200) redirect_base = "https://redirect.com/success/" filecontent = "nothing" filename = "test_filename.txt" res = test_client.post( "/", "https://tester.localhost:5000/", data={ "key": "asdf/the-key/${filename}", "file": (io.BytesIO(filecontent.encode("utf8")), filename), "success_action_redirect": redirect_base, }, ) real_key = "asdf/the-key/{}".format(filename) res.status_code.should.equal(303) redirect = res.headers["location"] assert redirect.startswith(redirect_base) parts = urlparse(redirect) args = parse_qs(parts.query) assert args["key"][0] == real_key assert args["bucket"][0] == "tester" res = test_client.get("/{}".format(real_key), "http://tester.localhost:5000/") res.status_code.should.equal(200) res.data.should.equal(filecontent.encode("utf8")) def test_s3_server_post_without_content_length(): test_client = authenticated_client() res = test_client.put( "/", "http://tester.localhost:5000/", environ_overrides={"CONTENT_LENGTH": ""} ) res.status_code.should.equal(411) res = test_client.post( "/", "https://tester.localhost:5000/", environ_overrides={"CONTENT_LENGTH": ""} ) res.status_code.should.equal(411) def test_s3_server_post_unicode_bucket_key(): # Make sure that we can deal with non-ascii characters in request URLs (e.g., S3 object names) dispatcher = server.DomainDispatcherApplication(server.create_backend_app) backend_app = dispatcher.get_application( {"HTTP_HOST": "s3.amazonaws.com", "PATH_INFO": "/test-bucket/test-object-てすと"} ) assert backend_app backend_app = dispatcher.get_application( { "HTTP_HOST": "s3.amazonaws.com", "PATH_INFO": "/test-bucket/test-object-てすと".encode("utf-8"), } ) assert backend_app def test_s3_server_post_cors(): """Test default CORS headers set by flask-cors plugin""" test_client = authenticated_client() # Create the bucket test_client.put("/", "http://tester.localhost:5000/") preflight_headers = { "Access-Control-Request-Method": "POST", "Access-Control-Request-Headers": "origin, x-requested-with", "Origin": "https://localhost:9000", } res = test_client.options( "/", "http://tester.localhost:5000/", headers=preflight_headers ) assert res.status_code in [200, 204] expected_methods = set(["DELETE", "PATCH", "PUT", "GET", "HEAD", "POST", "OPTIONS"]) assert ( set(res.headers["Access-Control-Allow-Methods"].split(", ")) == expected_methods ) res.headers.should.have.key("Access-Control-Allow-Origin").which.should.equal( "https://localhost:9000" ) res.headers.should.have.key("Access-Control-Allow-Headers").which.should.equal( "origin, x-requested-with" ) def test_s3_server_post_cors_exposed_header(): """Test that we can override default CORS headers with custom bucket rules""" # github.com/spulec/moto/issues/4220 cors_config_payload = """ https://example.org HEAD GET PUT POST DELETE * ETag 3000 """ test_client = authenticated_client() preflight_headers = { "Access-Control-Request-Method": "POST", "Access-Control-Request-Headers": "origin, x-requested-with", "Origin": "https://localhost:9000", } # Returns 403 on non existing bucket preflight_response = test_client.options( "/", "http://testcors.localhost:5000/", headers=preflight_headers ) assert preflight_response.status_code == 403 # Create the bucket test_client.put("/", "http://testcors.localhost:5000/") res = test_client.put( "/?cors", "http://testcors.localhost:5000", data=cors_config_payload ) assert res.status_code == 200 cors_res = test_client.get("/?cors", "http://testcors.localhost:5000") assert b"ETag" in cors_res.data # Test OPTIONS bucket response and key response for key_name in ("/", "/test"): preflight_response = test_client.options( key_name, "http://testcors.localhost:5000/", headers=preflight_headers ) assert preflight_response.status_code == 200 expected_cors_headers = { "Access-Control-Allow-Methods": "HEAD, GET, PUT, POST, DELETE", "Access-Control-Allow-Origin": "https://example.org", "Access-Control-Allow-Headers": "*", "Access-Control-Expose-Headers": "ETag", "Access-Control-Max-Age": "3000", } for header_name, header_value in expected_cors_headers.items(): assert header_name in preflight_response.headers assert preflight_response.headers[header_name] == header_value