import io
from urllib.parse import urlparse, parse_qs
import sure  # pylint: disable=unused-import
from flask.testing import FlaskClient
import moto.server as server
from unittest.mock import patch
"""
Test the different server responses
"""
class AuthenticatedClient(FlaskClient):
    def open(self, *args, **kwargs):
        kwargs["headers"] = kwargs.get("headers", {})
        kwargs["headers"]["Authorization"] = "Any authorization header"
        kwargs["content_length"] = 0  # Fixes content-length complaints.
        return super().open(*args, **kwargs)
def authenticated_client():
    backend = server.create_backend_app("s3")
    backend.test_client_class = AuthenticatedClient
    return backend.test_client()
def test_s3_server_get():
    test_client = authenticated_client()
    res = test_client.get("/")
    res.data.should.contain(b"ListAllMyBucketsResult")
def test_s3_server_bucket_create():
    test_client = authenticated_client()
    res = test_client.put("/", "http://foobaz.localhost:5000/")
    res.status_code.should.equal(200)
    res = test_client.get("/")
    res.data.should.contain(b"foobaz")
    res = test_client.get("/", "http://foobaz.localhost:5000/")
    res.status_code.should.equal(200)
    res.data.should.contain(b"ListBucketResult")
    res = test_client.put("/bar", "http://foobaz.localhost:5000/", data="test value")
    res.status_code.should.equal(200)
    assert "ETag" in dict(res.headers)
    res = test_client.get("/bar", "http://foobaz.localhost:5000/")
    res.status_code.should.equal(200)
    res.data.should.equal(b"test value")
def test_s3_server_ignore_subdomain_for_bucketnames():
    with patch("moto.settings.S3_IGNORE_SUBDOMAIN_BUCKETNAME", True):
        test_client = authenticated_client()
        res = test_client.put("/mybucket", "http://foobaz.localhost:5000/")
        res.status_code.should.equal(200)
        res.data.should.contain(b"mybucket")
def test_s3_server_bucket_versioning():
    test_client = authenticated_client()
    # Just enough XML to enable versioning
    body = "Enabled"
    res = test_client.put("/?versioning", "http://foobaz.localhost:5000", data=body)
    res.status_code.should.equal(200)
def test_s3_server_post_to_bucket():
    test_client = authenticated_client()
    res = test_client.put("/", "http://tester.localhost:5000/")
    res.status_code.should.equal(200)
    test_client.post(
        "/",
        "https://tester.localhost:5000/",
        data={"key": "the-key", "file": "nothing"},
    )
    res = test_client.get("/the-key", "http://tester.localhost:5000/")
    res.status_code.should.equal(200)
    res.data.should.equal(b"nothing")
def test_s3_server_post_to_bucket_redirect():
    test_client = authenticated_client()
    res = test_client.put("/", "http://tester.localhost:5000/")
    res.status_code.should.equal(200)
    redirect_base = "https://redirect.com/success/"
    filecontent = "nothing"
    filename = "test_filename.txt"
    res = test_client.post(
        "/",
        "https://tester.localhost:5000/",
        data={
            "key": "asdf/the-key/${filename}",
            "file": (io.BytesIO(filecontent.encode("utf8")), filename),
            "success_action_redirect": redirect_base,
        },
    )
    real_key = "asdf/the-key/{}".format(filename)
    res.status_code.should.equal(303)
    redirect = res.headers["location"]
    assert redirect.startswith(redirect_base)
    parts = urlparse(redirect)
    args = parse_qs(parts.query)
    assert args["key"][0] == real_key
    assert args["bucket"][0] == "tester"
    res = test_client.get("/{}".format(real_key), "http://tester.localhost:5000/")
    res.status_code.should.equal(200)
    res.data.should.equal(filecontent.encode("utf8"))
def test_s3_server_post_without_content_length():
    test_client = authenticated_client()
    res = test_client.put(
        "/", "http://tester.localhost:5000/", environ_overrides={"CONTENT_LENGTH": ""}
    )
    res.status_code.should.equal(411)
    res = test_client.post(
        "/", "https://tester.localhost:5000/", environ_overrides={"CONTENT_LENGTH": ""}
    )
    res.status_code.should.equal(411)
def test_s3_server_post_unicode_bucket_key():
    # Make sure that we can deal with non-ascii characters in request URLs (e.g., S3 object names)
    dispatcher = server.DomainDispatcherApplication(server.create_backend_app)
    backend_app = dispatcher.get_application(
        {"HTTP_HOST": "s3.amazonaws.com", "PATH_INFO": "/test-bucket/test-object-てすと"}
    )
    assert backend_app
    backend_app = dispatcher.get_application(
        {
            "HTTP_HOST": "s3.amazonaws.com",
            "PATH_INFO": "/test-bucket/test-object-てすと".encode("utf-8"),
        }
    )
    assert backend_app
def test_s3_server_post_cors():
    """Test default CORS headers set by flask-cors plugin"""
    test_client = authenticated_client()
    # Create the bucket
    test_client.put("/", "http://tester.localhost:5000/")
    preflight_headers = {
        "Access-Control-Request-Method": "POST",
        "Access-Control-Request-Headers": "origin, x-requested-with",
        "Origin": "https://localhost:9000",
    }
    res = test_client.options(
        "/", "http://tester.localhost:5000/", headers=preflight_headers
    )
    assert res.status_code in [200, 204]
    expected_methods = set(["DELETE", "PATCH", "PUT", "GET", "HEAD", "POST", "OPTIONS"])
    assert (
        set(res.headers["Access-Control-Allow-Methods"].split(", ")) == expected_methods
    )
    res.headers.should.have.key("Access-Control-Allow-Origin").which.should.equal(
        "https://localhost:9000"
    )
    res.headers.should.have.key("Access-Control-Allow-Headers").which.should.equal(
        "origin, x-requested-with"
    )
def test_s3_server_post_cors_exposed_header():
    """Test that we can override default CORS headers with custom bucket rules"""
    # github.com/spulec/moto/issues/4220
    cors_config_payload = """
  
    https://example.org
    HEAD
    GET
    PUT
    POST
    DELETE
    *
    ETag
    3000
  
    """
    test_client = authenticated_client()
    preflight_headers = {
        "Access-Control-Request-Method": "POST",
        "Access-Control-Request-Headers": "origin, x-requested-with",
        "Origin": "https://localhost:9000",
    }
    # Returns 403 on non existing bucket
    preflight_response = test_client.options(
        "/", "http://testcors.localhost:5000/", headers=preflight_headers
    )
    assert preflight_response.status_code == 403
    # Create the bucket
    test_client.put("/", "http://testcors.localhost:5000/")
    res = test_client.put(
        "/?cors", "http://testcors.localhost:5000", data=cors_config_payload
    )
    assert res.status_code == 200
    cors_res = test_client.get("/?cors", "http://testcors.localhost:5000")
    assert b"ETag" in cors_res.data
    # Test OPTIONS bucket response and key response
    for key_name in ("/", "/test"):
        preflight_response = test_client.options(
            key_name, "http://testcors.localhost:5000/", headers=preflight_headers
        )
        assert preflight_response.status_code == 200
        expected_cors_headers = {
            "Access-Control-Allow-Methods": "HEAD, GET, PUT, POST, DELETE",
            "Access-Control-Allow-Origin": "https://example.org",
            "Access-Control-Allow-Headers": "*",
            "Access-Control-Expose-Headers": "ETag",
            "Access-Control-Max-Age": "3000",
        }
        for header_name, header_value in expected_cors_headers.items():
            assert header_name in preflight_response.headers
            assert preflight_response.headers[header_name] == header_value