import io
import re
import urllib.parse
from typing import Any, Dict, Iterator, List, Optional, Tuple, Type, Union
from urllib.parse import parse_qs, unquote, urlencode, urlparse, urlunparse
from xml.dom import minidom
import xmltodict
from moto import settings
from moto.core.common_types import TYPE_RESPONSE
from moto.core.responses import BaseResponse
from moto.core.utils import (
extract_region_from_aws_authorization,
path_url,
str_to_rfc_1123_datetime,
)
from moto.s3bucket_path.utils import (
bucket_name_from_url as bucketpath_bucket_name_from_url,
)
from moto.s3bucket_path.utils import (
parse_key_name as bucketpath_parse_key_name,
)
from .exceptions import (
AccessForbidden,
BucketAccessDeniedError,
BucketAlreadyExists,
BucketMustHaveLockeEnabled,
DuplicateTagKeys,
HeadOnDeleteMarker,
IllegalLocationConstraintException,
InvalidContentMD5,
InvalidContinuationToken,
InvalidMaxPartArgument,
InvalidMaxPartNumberArgument,
InvalidNotificationARN,
InvalidNotificationEvent,
InvalidObjectState,
InvalidPartOrder,
InvalidRange,
LockNotEnabled,
MalformedACLError,
MalformedXML,
MissingBucket,
MissingKey,
MissingVersion,
NoSystemTags,
NotAnIntegerException,
ObjectNotInActiveTierError,
PreconditionFailed,
S3AclAndGrantError,
S3ClientError,
)
from .models import (
FakeAcl,
FakeBucket,
FakeGrant,
FakeGrantee,
FakeKey,
S3Backend,
get_canned_acl,
s3_backends,
)
from .notifications import S3NotificationEvent
from .select_object_content import serialize_select
from .utils import (
ARCHIVE_STORAGE_CLASSES,
bucket_name_from_url,
compute_checksum,
cors_matches_origin,
metadata_from_headers,
parse_region_from_url,
)
DEFAULT_REGION_NAME = "us-east-1"
ACTION_MAP = {
"BUCKET": {
"HEAD": {"DEFAULT": "HeadBucket"},
"GET": {
"uploads": "ListBucketMultipartUploads",
"location": "GetBucketLocation",
"lifecycle": "GetLifecycleConfiguration",
"versioning": "GetBucketVersioning",
"policy": "GetBucketPolicy",
"website": "GetBucketWebsite",
"acl": "GetBucketAcl",
"tagging": "GetBucketTagging",
"logging": "GetBucketLogging",
"cors": "GetBucketCORS",
"notification": "GetBucketNotification",
"accelerate": "GetAccelerateConfiguration",
"versions": "ListBucketVersions",
"public_access_block": "GetPublicAccessBlock",
"DEFAULT": "ListBucket",
},
"PUT": {
"lifecycle": "PutLifecycleConfiguration",
"versioning": "PutBucketVersioning",
"policy": "PutBucketPolicy",
"website": "PutBucketWebsite",
"acl": "PutBucketAcl",
"tagging": "PutBucketTagging",
"logging": "PutBucketLogging",
"cors": "PutBucketCORS",
"notification": "PutBucketNotification",
"accelerate": "PutAccelerateConfiguration",
"public_access_block": "PutPublicAccessBlock",
"DEFAULT": "CreateBucket",
},
"DELETE": {
"lifecycle": "PutLifecycleConfiguration",
"policy": "DeleteBucketPolicy",
"website": "DeleteBucketWebsite",
"tagging": "PutBucketTagging",
"cors": "PutBucketCORS",
"public_access_block": "DeletePublicAccessBlock",
"DEFAULT": "DeleteBucket",
},
},
"KEY": {
"HEAD": {"DEFAULT": "HeadObject"},
"GET": {
"uploadId": "ListMultipartUploadParts",
"acl": "GetObjectAcl",
"tagging": "GetObjectTagging",
"versionId": "GetObjectVersion",
"DEFAULT": "GetObject",
},
"PUT": {
"acl": "PutObjectAcl",
"tagging": "PutObjectTagging",
"DEFAULT": "PutObject",
},
"DELETE": {
"uploadId": "AbortMultipartUpload",
"versionId": "DeleteObjectVersion",
"DEFAULT": "DeleteObject",
},
"POST": {
"uploads": "PutObject",
"restore": "RestoreObject",
"uploadId": "PutObject",
"select": "SelectObject",
},
},
"CONTROL": {
"GET": {"publicAccessBlock": "GetPublicAccessBlock"},
"PUT": {"publicAccessBlock": "PutPublicAccessBlock"},
"DELETE": {"publicAccessBlock": "DeletePublicAccessBlock"},
},
}
def parse_key_name(pth: str) -> str:
# strip the first '/' left by urlparse
return pth[1:] if pth.startswith("/") else pth
class S3Response(BaseResponse):
def __init__(self) -> None:
super().__init__(service_name="s3")
# Whatever format requests come in, we should never touch them
# There are some nuances here - this decision should be method-specific, instead of service-specific
# E.G.: we don't want to touch put_object(), but we might have to decompress put_object_configuration()
# Taking the naive approach to never decompress anything from S3 for now
self.allow_request_decompression = False
def get_safe_path(self) -> str:
return unquote(self.raw_path)
@property
def is_access_point(self) -> bool:
return ".s3-accesspoint." in self.headers["host"]
@property
def backend(self) -> S3Backend:
return s3_backends[self.current_account]["global"]
@property
def should_autoescape(self) -> bool:
return True
def all_buckets(self) -> str:
self.data["Action"] = "ListAllMyBuckets"
self._authenticate_and_authorize_s3_action()
# No bucket specified. Listing all buckets
all_buckets = self.backend.list_buckets()
template = self.response_template(S3_ALL_BUCKETS)
return template.render(buckets=all_buckets)
def subdomain_based_buckets(self, request: Any) -> bool:
if settings.S3_IGNORE_SUBDOMAIN_BUCKETNAME:
return False
host = request.headers.get("host", request.headers.get("Host"))
if not host:
host = urlparse(request.url).netloc
custom_endpoints = settings.get_s3_custom_endpoints()
if (
host
and custom_endpoints
and any([host in endpoint for endpoint in custom_endpoints])
):
# Default to path-based buckets for S3-compatible SDKs (Ceph, DigitalOcean Spaces, etc)
return False
if (
not host
or host.startswith("localhost")
or host.startswith("localstack")
or host.startswith("host.docker.internal")
or re.match(r"^[^.]+$", host)
or re.match(r"^.*\.svc\.cluster\.local:?\d*$", host)
):
# Default to path-based buckets for (1) localhost, (2) localstack hosts (e.g. localstack.dev),
# (3) local host names that do not contain a "." (e.g., Docker container host names), or
# (4) kubernetes host names
return False
match = re.match(r"^([^\[\]:]+)(:\d+)?$", host)
if match:
match = re.match(
r"((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)(\.|$)){4}", match.groups()[0]
)
if match:
return False
match = re.match(r"^\[(.+)\](:\d+)?$", host)
if match:
match = re.match(
r"^(((?=.*(::))(?!.*\3.+\3))\3?|[\dA-F]{1,4}:)([\dA-F]{1,4}(\3|:\b)|\2){5}(([\dA-F]{1,4}(\3|:\b|$)|\2){2}|(((2[0-4]|1\d|[1-9])?\d|25[0-5])\.?\b){4})\Z",
match.groups()[0],
re.IGNORECASE,
)
if match:
return False
path_based = host == "s3.amazonaws.com" or re.match(
r"s3[\.\-]([^.]*)\.amazonaws\.com", host
)
return not path_based
def is_delete_keys(self) -> bool:
qs = parse_qs(urlparse(self.path).query, keep_blank_values=True)
return "delete" in qs
def parse_bucket_name_from_url(self, request: Any, url: str) -> str:
bucket_name = ""
if self.subdomain_based_buckets(request):
bucket_name = bucket_name_from_url(url) # type: ignore
else:
bucket_name = bucketpath_bucket_name_from_url(url) # type: ignore
if self.is_access_point:
# import here to avoid circular dependency error
from moto.s3control import s3control_backends
ap_name = bucket_name[: -(len(self.current_account) + 1)]
ap = s3control_backends[self.current_account]["global"].get_access_point(
self.current_account, ap_name
)
bucket_name = ap.bucket
return bucket_name
def parse_key_name(self, request: Any, url: str) -> str:
if self.subdomain_based_buckets(request):
return parse_key_name(url)
else:
return bucketpath_parse_key_name(url)
def ambiguous_response(
self, request: Any, full_url: str, headers: Any
) -> TYPE_RESPONSE:
# Depending on which calling format the client is using, we don't know
# if this is a bucket or key request so we have to check
if self.subdomain_based_buckets(request):
return self.key_response(request, full_url, headers)
else:
# Using path-based buckets
return self.bucket_response(request, full_url, headers)
def bucket_response(
self, request: Any, full_url: str, headers: Any
) -> TYPE_RESPONSE:
self.setup_class(request, full_url, headers, use_raw_body=True)
bucket_name = self.parse_bucket_name_from_url(request, full_url)
self.backend.log_incoming_request(request, bucket_name)
try:
response = self._bucket_response(request, full_url)
except S3ClientError as s3error:
response = s3error.code, {}, s3error.description
return self._send_response(response)
@staticmethod
def _send_response(response: Any) -> TYPE_RESPONSE: # type: ignore
if isinstance(response, str):
return 200, {}, response.encode("utf-8")
else:
status_code, headers, response_content = response
if not isinstance(response_content, bytes):
response_content = response_content.encode("utf-8")
return status_code, headers, response_content
def _bucket_response(
self, request: Any, full_url: str
) -> Union[str, TYPE_RESPONSE]:
querystring = self._get_querystring(request, full_url)
method = request.method
region_name = parse_region_from_url(full_url, use_default_region=False)
if region_name is None:
region_name = extract_region_from_aws_authorization(
request.headers.get("Authorization", "")
)
region_name = region_name or DEFAULT_REGION_NAME
bucket_name = self.parse_bucket_name_from_url(request, full_url)
if not bucket_name:
# If no bucket specified, list all buckets
return self.all_buckets()
self.data["BucketName"] = bucket_name
if method == "HEAD":
return self._bucket_response_head(bucket_name, querystring)
elif method == "GET":
return self._bucket_response_get(bucket_name, querystring)
elif method == "PUT":
return self._bucket_response_put(
request, region_name, bucket_name, querystring
)
elif method == "DELETE":
return self._bucket_response_delete(bucket_name, querystring)
elif method == "POST":
return self._bucket_response_post(request, bucket_name)
elif method == "OPTIONS":
return self._response_options(request.headers, bucket_name)
else:
raise NotImplementedError(
f"Method {method} has not been implemented in the S3 backend yet"
)
def _get_querystring(self, request: Any, full_url: str) -> Dict[str, Any]: # type: ignore[misc]
# Flask's Request has the querystring already parsed
# In ServerMode, we can use this, instead of manually parsing this
if hasattr(request, "args"):
query_dict = dict()
for key, val in dict(request.args).items():
# The parse_qs-method returns List[str, List[Any]]
# Ensure that we confirm to the same response-type here
query_dict[key] = val if isinstance(val, list) else [val]
return query_dict
parsed_url = urlparse(full_url)
# full_url can be one of two formats, depending on the version of werkzeug used:
# http://foobaz.localhost:5000/?prefix=bar%2Bbaz
# http://foobaz.localhost:5000/?prefix=bar+baz
# Werkzeug helpfully encodes the plus-sign for us, from >= 2.1.0
# However, the `parse_qs` method will (correctly) replace '+' with a space
#
# Workaround - manually reverse the encoding.
# Keep the + encoded, ensuring that parse_qsl doesn't replace it, and parse_qsl will unquote it afterwards
qs = (parsed_url.query or "").replace("+", "%2B")
return parse_qs(qs, keep_blank_values=True)
def _bucket_response_head(
self, bucket_name: str, querystring: Dict[str, Any]
) -> TYPE_RESPONSE:
self._set_action("BUCKET", "HEAD", querystring)
self._authenticate_and_authorize_s3_action(bucket_name=bucket_name)
try:
bucket = self.backend.head_bucket(bucket_name)
except MissingBucket:
# Unless we do this, boto3 does not raise ClientError on
# HEAD (which the real API responds with), and instead
# raises NoSuchBucket, leading to inconsistency in
# error response between real and mocked responses.
return 404, {}, ""
return 200, {"x-amz-bucket-region": bucket.region_name}, ""
def _set_cors_headers_options(
self, headers: Dict[str, str], bucket: FakeBucket
) -> None:
"""
TODO: smarter way of matching the right CORS rule:
See https://docs.aws.amazon.com/AmazonS3/latest/userguide/cors.html
"When Amazon S3 receives a preflight request from a browser, it evaluates
the CORS configuration for the bucket and uses the first CORSRule rule
that matches the incoming browser request to enable a cross-origin request."
This here just uses all rules and the last rule will override the previous ones
if they are re-defining the same headers.
"""
def _to_string(header: Union[List[str], str]) -> str:
# We allow list and strs in header values. Transform lists in comma-separated strings
if isinstance(header, list):
return ", ".join(header)
return header
for cors_rule in bucket.cors:
if cors_rule.allowed_methods is not None:
self.response_headers["Access-Control-Allow-Methods"] = _to_string(
cors_rule.allowed_methods
)
if cors_rule.allowed_origins is not None:
origin = headers.get("Origin")
if cors_matches_origin(origin, cors_rule.allowed_origins): # type: ignore
self.response_headers["Access-Control-Allow-Origin"] = origin # type: ignore
else:
raise AccessForbidden(
"CORSResponse: This CORS request is not allowed. This is usually because the evalution of Origin, request method / Access-Control-Request-Method or Access-Control-Request-Headers are not whitelisted by the resource's CORS spec."
)
if cors_rule.allowed_headers is not None:
self.response_headers["Access-Control-Allow-Headers"] = _to_string(
cors_rule.allowed_headers
)
if cors_rule.exposed_headers is not None:
self.response_headers["Access-Control-Expose-Headers"] = _to_string(
cors_rule.exposed_headers
)
if cors_rule.max_age_seconds is not None:
self.response_headers["Access-Control-Max-Age"] = _to_string(
cors_rule.max_age_seconds
)
def _response_options(
self, headers: Dict[str, str], bucket_name: str
) -> TYPE_RESPONSE:
# Return 200 with the headers from the bucket CORS configuration
self._authenticate_and_authorize_s3_action(bucket_name=bucket_name)
try:
bucket = self.backend.head_bucket(bucket_name)
except MissingBucket:
# AWS S3 seems to return 403 on OPTIONS and 404 on GET/HEAD
return 403, {}, ""
self._set_cors_headers_options(headers, bucket)
return 200, self.response_headers, ""
def _get_cors_headers_other(
self, headers: Dict[str, str], bucket_name: str
) -> Dict[str, Any]:
"""
Returns a dictionary with the appropriate CORS headers
Should be used for non-OPTIONS requests only
Applicable if the 'Origin' header matches one of a CORS-rules - returns an empty dictionary otherwise
"""
response_headers: Dict[str, Any] = dict()
try:
origin = headers.get("Origin")
if not origin:
return response_headers
bucket = self.backend.get_bucket(bucket_name)
def _to_string(header: Union[List[str], str]) -> str:
# We allow list and strs in header values. Transform lists in comma-separated strings
if isinstance(header, list):
return ", ".join(header)
return header
for cors_rule in bucket.cors:
if cors_rule.allowed_origins is not None:
if cors_matches_origin(origin, cors_rule.allowed_origins):
response_headers["Access-Control-Allow-Origin"] = origin
if cors_rule.allowed_methods is not None:
response_headers[
"Access-Control-Allow-Methods"
] = _to_string(cors_rule.allowed_methods)
if cors_rule.allowed_headers is not None:
response_headers[
"Access-Control-Allow-Headers"
] = _to_string(cors_rule.allowed_headers)
if cors_rule.exposed_headers is not None:
response_headers[
"Access-Control-Expose-Headers"
] = _to_string(cors_rule.exposed_headers)
if cors_rule.max_age_seconds is not None:
response_headers["Access-Control-Max-Age"] = _to_string(
cors_rule.max_age_seconds
)
return response_headers
except S3ClientError:
pass
return response_headers
def _bucket_response_get(
self, bucket_name: str, querystring: Dict[str, Any]
) -> Union[str, TYPE_RESPONSE]:
self._set_action("BUCKET", "GET", querystring)
self._authenticate_and_authorize_s3_action(bucket_name=bucket_name)
if "object-lock" in querystring:
(
lock_enabled,
mode,
days,
years,
) = self.backend.get_object_lock_configuration(bucket_name)
template = self.response_template(S3_BUCKET_LOCK_CONFIGURATION)
return template.render(
lock_enabled=lock_enabled, mode=mode, days=days, years=years
)
if "uploads" in querystring:
for unsup in ("delimiter", "max-uploads"):
if unsup in querystring:
raise NotImplementedError(
f"Listing multipart uploads with {unsup} has not been implemented yet."
)
multiparts = list(self.backend.get_all_multiparts(bucket_name).values())
if "prefix" in querystring:
prefix = querystring.get("prefix", [None])[0]
multiparts = [
upload
for upload in multiparts
if upload.key_name.startswith(prefix)
]
template = self.response_template(S3_ALL_MULTIPARTS)
return template.render(
bucket_name=bucket_name,
uploads=multiparts,
account_id=self.current_account,
)
elif "location" in querystring:
location: Optional[str] = self.backend.get_bucket_location(bucket_name)
template = self.response_template(S3_BUCKET_LOCATION)
# us-east-1 is different - returns a None location
if location == DEFAULT_REGION_NAME:
location = None
return template.render(location=location)
elif "lifecycle" in querystring:
rules = self.backend.get_bucket_lifecycle(bucket_name)
if not rules:
template = self.response_template(S3_NO_LIFECYCLE)
return 404, {}, template.render(bucket_name=bucket_name)
template = self.response_template(S3_BUCKET_LIFECYCLE_CONFIGURATION)
return template.render(rules=rules)
elif "versioning" in querystring:
versioning = self.backend.get_bucket_versioning(bucket_name)
template = self.response_template(S3_BUCKET_GET_VERSIONING)
return template.render(status=versioning)
elif "policy" in querystring:
policy = self.backend.get_bucket_policy(bucket_name)
if not policy:
template = self.response_template(S3_NO_POLICY)
return 404, {}, template.render(bucket_name=bucket_name)
return 200, {}, policy
elif "website" in querystring:
website_configuration = self.backend.get_bucket_website_configuration(
bucket_name
)
if not website_configuration:
template = self.response_template(S3_NO_BUCKET_WEBSITE_CONFIG)
return 404, {}, template.render(bucket_name=bucket_name)
return 200, {}, website_configuration # type: ignore
elif "acl" in querystring:
acl = self.backend.get_bucket_acl(bucket_name)
template = self.response_template(S3_OBJECT_ACL_RESPONSE)
return template.render(acl=acl)
elif "tagging" in querystring:
tags = self.backend.get_bucket_tagging(bucket_name)["Tags"]
# "Special Error" if no tags:
if len(tags) == 0:
template = self.response_template(S3_NO_BUCKET_TAGGING)
return 404, {}, template.render(bucket_name=bucket_name)
template = self.response_template(S3_OBJECT_TAGGING_RESPONSE)
return template.render(tags=tags)
elif "logging" in querystring:
logging = self.backend.get_bucket_logging(bucket_name)
if not logging:
template = self.response_template(S3_NO_LOGGING_CONFIG)
return 200, {}, template.render()
template = self.response_template(S3_LOGGING_CONFIG)
return 200, {}, template.render(logging=logging)
elif "cors" in querystring:
cors = self.backend.get_bucket_cors(bucket_name)
if len(cors) == 0:
template = self.response_template(S3_NO_CORS_CONFIG)
return 404, {}, template.render(bucket_name=bucket_name)
template = self.response_template(S3_BUCKET_CORS_RESPONSE)
return template.render(cors=cors)
elif "notification" in querystring:
notification_configuration = (
self.backend.get_bucket_notification_configuration(bucket_name)
)
if not notification_configuration:
return 200, {}, ""
template = self.response_template(S3_GET_BUCKET_NOTIFICATION_CONFIG)
return template.render(config=notification_configuration)
elif "accelerate" in querystring:
bucket = self.backend.get_bucket(bucket_name)
if bucket.accelerate_configuration is None:
template = self.response_template(S3_BUCKET_ACCELERATE_NOT_SET)
return 200, {}, template.render()
template = self.response_template(S3_BUCKET_ACCELERATE)
return template.render(bucket=bucket)
elif "publicAccessBlock" in querystring:
public_block_config = self.backend.get_public_access_block(bucket_name)
template = self.response_template(S3_PUBLIC_ACCESS_BLOCK_CONFIGURATION)
return template.render(public_block_config=public_block_config)
elif "versions" in querystring:
delimiter = querystring.get("delimiter", [None])[0]
key_marker = querystring.get("key-marker", [None])[0]
max_keys = int(querystring.get("max-keys", [1000])[0])
prefix = querystring.get("prefix", [""])[0]
version_id_marker = querystring.get("version-id-marker", [None])[0]
bucket = self.backend.get_bucket(bucket_name)
(
versions,
common_prefixes,
delete_markers,
next_key_marker,
next_version_id_marker,
) = self.backend.list_object_versions(
bucket_name,
delimiter=delimiter,
key_marker=key_marker,
max_keys=max_keys,
prefix=prefix,
version_id_marker=version_id_marker,
)
key_list = versions
is_truncated = False
if next_key_marker is not None:
is_truncated = True
template = self.response_template(S3_BUCKET_GET_VERSIONS)
return (
200,
{},
template.render(
common_prefixes=common_prefixes,
key_list=key_list,
delete_marker_list=delete_markers,
bucket=bucket,
prefix=prefix,
max_keys=max_keys,
delimiter=delimiter,
key_marker=key_marker,
version_id_marker=version_id_marker,
is_truncated=is_truncated,
next_key_marker=next_key_marker,
next_version_id_marker=next_version_id_marker,
),
)
elif "encryption" in querystring:
encryption = self.backend.get_bucket_encryption(bucket_name)
if not encryption:
template = self.response_template(S3_NO_ENCRYPTION)
return 404, {}, template.render(bucket_name=bucket_name)
template = self.response_template(S3_ENCRYPTION_CONFIG)
return 200, {}, template.render(encryption=encryption)
elif querystring.get("list-type", [None])[0] == "2":
return 200, {}, self._handle_list_objects_v2(bucket_name, querystring)
elif "replication" in querystring:
replication = self.backend.get_bucket_replication(bucket_name)
if not replication:
template = self.response_template(S3_NO_REPLICATION)
return 404, {}, template.render(bucket_name=bucket_name)
template = self.response_template(S3_REPLICATION_CONFIG)
return 200, {}, template.render(replication=replication)
elif "ownershipControls" in querystring:
ownership_rule = self.backend.get_bucket_ownership_controls(bucket_name)
if not ownership_rule:
template = self.response_template(S3_ERROR_BUCKET_ONWERSHIP_NOT_FOUND)
return 404, {}, template.render(bucket_name=bucket_name)
template = self.response_template(S3_BUCKET_GET_OWNERSHIP_RULE)
return 200, {}, template.render(ownership_rule=ownership_rule)
bucket = self.backend.get_bucket(bucket_name)
prefix = querystring.get("prefix", [None])[0]
if prefix and isinstance(prefix, bytes):
prefix = prefix.decode("utf-8")
delimiter = querystring.get("delimiter", [None])[0]
max_keys = int(querystring.get("max-keys", [1000])[0])
marker = querystring.get("marker", [None])[0]
result_keys, result_folders = self.backend.list_objects(
bucket, prefix, delimiter
)
encoding_type = querystring.get("encoding-type", [None])[0]
if marker:
result_keys = self._get_results_from_token(result_keys, marker)
result_keys, is_truncated, next_marker = self._truncate_result(
result_keys, max_keys
)
template = self.response_template(S3_BUCKET_GET_RESPONSE)
return (
200,
{},
template.render(
bucket=bucket,
prefix=prefix,
delimiter=delimiter,
result_keys=result_keys,
result_folders=result_folders,
is_truncated=is_truncated,
next_marker=next_marker,
max_keys=max_keys,
encoding_type=encoding_type,
),
)
def _set_action(
self, action_resource_type: str, method: str, querystring: Dict[str, Any]
) -> None:
action_set = False
for action_in_querystring, action in ACTION_MAP[action_resource_type][
method
].items():
if action_in_querystring in querystring:
self.data["Action"] = action
action_set = True
if not action_set:
self.data["Action"] = ACTION_MAP[action_resource_type][method]["DEFAULT"]
def _handle_list_objects_v2(
self, bucket_name: str, querystring: Dict[str, Any]
) -> str:
template = self.response_template(S3_BUCKET_GET_RESPONSE_V2)
bucket = self.backend.get_bucket(bucket_name)
continuation_token = querystring.get("continuation-token", [None])[0]
if continuation_token is not None and continuation_token == "":
raise InvalidContinuationToken()
prefix = querystring.get("prefix", [None])[0]
if prefix and isinstance(prefix, bytes):
prefix = prefix.decode("utf-8")
delimiter = querystring.get("delimiter", [None])[0]
all_keys = self.backend.list_objects_v2(bucket, prefix, delimiter)
fetch_owner = querystring.get("fetch-owner", [False])[0]
max_keys = int(querystring.get("max-keys", [1000])[0])
start_after = querystring.get("start-after", [None])[0]
encoding_type = querystring.get("encoding-type", [None])[0]
if continuation_token or start_after:
limit = continuation_token or start_after
all_keys = self._get_results_from_token(all_keys, limit)
truncated_keys, is_truncated, next_continuation_token = self._truncate_result(
all_keys, max_keys
)
result_keys, result_folders = self._split_truncated_keys(truncated_keys)
key_count = len(result_keys) + len(result_folders)
if encoding_type == "url":
prefix = urllib.parse.quote(prefix) if prefix else ""
result_folders = list(
map(lambda folder: urllib.parse.quote(folder), result_folders)
)
return template.render(
bucket=bucket,
prefix=prefix or "",
delimiter=delimiter,
key_count=key_count,
result_keys=result_keys,
result_folders=result_folders,
fetch_owner=fetch_owner,
max_keys=max_keys,
is_truncated=is_truncated,
next_continuation_token=next_continuation_token,
start_after=None if continuation_token else start_after,
encoding_type=encoding_type,
)
@staticmethod
def _split_truncated_keys(truncated_keys: Any) -> Any: # type: ignore[misc]
result_keys = []
result_folders = []
for key in truncated_keys:
if isinstance(key, FakeKey):
result_keys.append(key)
else:
result_folders.append(key)
return result_keys, result_folders
def _get_results_from_token(self, result_keys: Any, token: Any) -> Any:
continuation_index = 0
for key in result_keys:
if (key.name if isinstance(key, FakeKey) else key) > token:
break
continuation_index += 1
return result_keys[continuation_index:]
def _truncate_result(self, result_keys: Any, max_keys: int) -> Any:
if max_keys == 0:
result_keys = []
is_truncated = True
next_continuation_token = None
elif len(result_keys) > max_keys:
is_truncated = "true" # type: ignore
result_keys = result_keys[:max_keys]
item = result_keys[-1]
next_continuation_token = item.name if isinstance(item, FakeKey) else item
else:
is_truncated = "false" # type: ignore
next_continuation_token = None
return result_keys, is_truncated, next_continuation_token
def _body_contains_location_constraint(self, body: bytes) -> bool:
if body:
try:
xmltodict.parse(body)["CreateBucketConfiguration"]["LocationConstraint"]
return True
except KeyError:
pass
return False
def _create_bucket_configuration_is_empty(self, body: bytes) -> bool:
if body:
try:
create_bucket_configuration = xmltodict.parse(body)[
"CreateBucketConfiguration"
]
del create_bucket_configuration["@xmlns"]
if len(create_bucket_configuration) == 0:
return True
except KeyError:
pass
return False
def _parse_pab_config(self) -> Dict[str, Any]:
parsed_xml = xmltodict.parse(self.body)
parsed_xml["PublicAccessBlockConfiguration"].pop("@xmlns", None)
return parsed_xml
def _bucket_response_put(
self,
request: Any,
region_name: str,
bucket_name: str,
querystring: Dict[str, Any],
) -> Union[str, TYPE_RESPONSE]:
if querystring and not request.headers.get("Content-Length"):
return 411, {}, "Content-Length required"
self._set_action("BUCKET", "PUT", querystring)
self._authenticate_and_authorize_s3_action(bucket_name=bucket_name)
if "object-lock" in querystring:
config = self._lock_config_from_body()
if not self.backend.get_bucket(bucket_name).object_lock_enabled:
raise BucketMustHaveLockeEnabled
self.backend.put_object_lock_configuration(
bucket_name,
config.get("enabled"), # type: ignore
config.get("mode"),
config.get("days"),
config.get("years"),
)
return 200, {}, ""
if "versioning" in querystring:
body = self.body.decode("utf-8")
ver = re.search(r"([A-Za-z]+)", body)
if ver:
self.backend.put_bucket_versioning(bucket_name, ver.group(1))
template = self.response_template(S3_BUCKET_VERSIONING)
return template.render(bucket_versioning_status=ver.group(1))
else:
return 404, {}, ""
elif "lifecycle" in querystring:
rules = xmltodict.parse(self.body)["LifecycleConfiguration"]["Rule"]
if not isinstance(rules, list):
# If there is only one rule, xmldict returns just the item
rules = [rules]
self.backend.put_bucket_lifecycle(bucket_name, rules)
return ""
elif "policy" in querystring:
self.backend.put_bucket_policy(bucket_name, self.body)
return "True"
elif "acl" in querystring:
# Headers are first. If not set, then look at the body (consistent with the documentation):
acls = self._acl_from_headers(request.headers)
if not acls:
acls = self._acl_from_body()
self.backend.put_bucket_acl(bucket_name, acls)
return ""
elif "tagging" in querystring:
tagging = self._bucket_tagging_from_body()
self.backend.put_bucket_tagging(bucket_name, tagging)
return 204, {}, ""
elif "website" in querystring:
self.backend.set_bucket_website_configuration(bucket_name, self.body)
return ""
elif "cors" in querystring:
try:
self.backend.put_bucket_cors(bucket_name, self._cors_from_body())
return ""
except KeyError:
raise MalformedXML()
elif "logging" in querystring:
try:
self.backend.put_bucket_logging(bucket_name, self._logging_from_body())
return ""
except KeyError:
raise MalformedXML()
elif "notification" in querystring:
try:
self.backend.put_bucket_notification_configuration(
bucket_name, self._notification_config_from_body()
)
return ""
except KeyError:
raise MalformedXML()
except Exception as e:
raise e
elif "accelerate" in querystring:
try:
accelerate_status = self._accelerate_config_from_body()
self.backend.put_bucket_accelerate_configuration(
bucket_name, accelerate_status
)
return ""
except KeyError:
raise MalformedXML()
except Exception as e:
raise e
elif "publicAccessBlock" in querystring:
pab_config = self._parse_pab_config()
self.backend.put_public_access_block(
bucket_name, pab_config["PublicAccessBlockConfiguration"]
)
return ""
elif "encryption" in querystring:
try:
self.backend.put_bucket_encryption(
bucket_name, self._encryption_config_from_body()
)
return ""
except KeyError:
raise MalformedXML()
except Exception as e:
raise e
elif "replication" in querystring:
bucket = self.backend.get_bucket(bucket_name)
if not bucket.is_versioned:
template = self.response_template(S3_NO_VERSIONING_ENABLED)
return 400, {}, template.render(bucket_name=bucket_name)
replication_config = self._replication_config_from_xml(self.body)
self.backend.put_bucket_replication(bucket_name, replication_config)
return ""
elif "ownershipControls" in querystring:
ownership_rule = self._ownership_rule_from_body()
self.backend.put_bucket_ownership_controls(
bucket_name, ownership=ownership_rule
)
return ""
else:
# us-east-1, the default AWS region behaves a bit differently
# - you should not use it as a location constraint --> it fails
# - querying the location constraint returns None
# - LocationConstraint has to be specified if outside us-east-1
if (
region_name != DEFAULT_REGION_NAME
and not self._body_contains_location_constraint(self.body)
):
raise IllegalLocationConstraintException()
if self.body:
if self._create_bucket_configuration_is_empty(self.body):
raise MalformedXML()
try:
forced_region = xmltodict.parse(self.body)[
"CreateBucketConfiguration"
]["LocationConstraint"]
if forced_region == DEFAULT_REGION_NAME:
raise S3ClientError(
"InvalidLocationConstraint",
"The specified location-constraint is not valid",
)
else:
region_name = forced_region
except KeyError:
pass
try:
new_bucket = self.backend.create_bucket(bucket_name, region_name)
except BucketAlreadyExists:
new_bucket = self.backend.get_bucket(bucket_name)
if new_bucket.account_id == self.get_current_account():
# special cases when the bucket belongs to self
if (
new_bucket.region_name == DEFAULT_REGION_NAME
and region_name == DEFAULT_REGION_NAME
):
# us-east-1 has different behavior - creating a bucket there is an idempotent operation
pass
else:
template = self.response_template(S3_DUPLICATE_BUCKET_ERROR)
return 409, {}, template.render(bucket_name=bucket_name)
else:
raise
if "x-amz-acl" in request.headers:
# TODO: Support the XML-based ACL format
self.backend.put_bucket_acl(
bucket_name, self._acl_from_headers(request.headers)
)
if (
request.headers.get("x-amz-bucket-object-lock-enabled", "").lower()
== "true"
):
new_bucket.object_lock_enabled = True
new_bucket.versioning_status = "Enabled"
ownership_rule = request.headers.get("x-amz-object-ownership")
if ownership_rule:
new_bucket.ownership_rule = ownership_rule
template = self.response_template(S3_BUCKET_CREATE_RESPONSE)
return 200, {}, template.render(bucket=new_bucket)
def _bucket_response_delete(
self, bucket_name: str, querystring: Dict[str, Any]
) -> TYPE_RESPONSE:
self._set_action("BUCKET", "DELETE", querystring)
self._authenticate_and_authorize_s3_action(bucket_name=bucket_name)
if "policy" in querystring:
self.backend.delete_bucket_policy(bucket_name)
return 204, {}, ""
elif "tagging" in querystring:
self.backend.delete_bucket_tagging(bucket_name)
return 204, {}, ""
elif "website" in querystring:
self.backend.delete_bucket_website(bucket_name)
return 204, {}, ""
elif "cors" in querystring:
self.backend.delete_bucket_cors(bucket_name)
return 204, {}, ""
elif "lifecycle" in querystring:
self.backend.delete_bucket_lifecycle(bucket_name)
return 204, {}, ""
elif "publicAccessBlock" in querystring:
self.backend.delete_public_access_block(bucket_name)
return 204, {}, ""
elif "encryption" in querystring:
self.backend.delete_bucket_encryption(bucket_name)
return 204, {}, ""
elif "replication" in querystring:
self.backend.delete_bucket_replication(bucket_name)
return 204, {}, ""
elif "ownershipControls" in querystring:
self.backend.delete_bucket_ownership_controls(bucket_name)
return 204, {}, ""
removed_bucket = self.backend.delete_bucket(bucket_name)
if removed_bucket:
# Bucket exists
template = self.response_template(S3_DELETE_BUCKET_SUCCESS)
return 204, {}, template.render(bucket=removed_bucket)
else:
# Tried to delete a bucket that still has keys
template = self.response_template(S3_DELETE_BUCKET_WITH_ITEMS_ERROR)
return 409, {}, template.render(bucket=removed_bucket)
def _bucket_response_post(self, request: Any, bucket_name: str) -> TYPE_RESPONSE:
response_headers = {}
if not request.headers.get("Content-Length"):
return 411, {}, "Content-Length required"
self.path = self._get_path(request)
if self.is_delete_keys():
self.data["Action"] = "DeleteObject"
try:
self._authenticate_and_authorize_s3_action(bucket_name=bucket_name)
return self._bucket_response_delete_keys(bucket_name)
except BucketAccessDeniedError:
return self._bucket_response_delete_keys(
bucket_name, authenticated=False
)
self.data["Action"] = "PutObject"
self._authenticate_and_authorize_s3_action(bucket_name=bucket_name)
key = self.querystring["key"][0]
f = self.body
if "success_action_redirect" in self.querystring:
redirect = self.querystring["success_action_redirect"][0]
parts = urlparse(redirect)
queryargs: Dict[str, Any] = parse_qs(parts.query)
queryargs["key"] = key
queryargs["bucket"] = bucket_name
redirect_queryargs = urlencode(queryargs, doseq=True)
newparts = (
parts.scheme,
parts.netloc,
parts.path,
parts.params,
redirect_queryargs,
parts.fragment,
)
fixed_redirect = urlunparse(newparts)
response_headers["Location"] = fixed_redirect
if "success_action_status" in self.querystring:
status_code = self.querystring["success_action_status"][0]
elif "success_action_redirect" in self.querystring:
status_code = 303
else:
status_code = 204
new_key = self.backend.put_object(bucket_name, key, f)
if self.querystring.get("acl"):
acl = get_canned_acl(self.querystring["acl"][0]) # type: ignore
new_key.set_acl(acl)
# Metadata
metadata = metadata_from_headers(self.form_data)
new_key.set_metadata(metadata)
return status_code, response_headers, ""
@staticmethod
def _get_path(request: Any) -> str: # type: ignore[misc]
return (
request.full_path
if hasattr(request, "full_path")
else path_url(request.url)
)
def _bucket_response_delete_keys(
self, bucket_name: str, authenticated: bool = True
) -> TYPE_RESPONSE:
template = self.response_template(S3_DELETE_KEYS_RESPONSE)
body_dict = xmltodict.parse(self.body, strip_whitespace=False)
objects = body_dict["Delete"].get("Object", [])
if not isinstance(objects, list):
# We expect a list of objects, but when there is a single