from __future__ import unicode_literals
import re
import six
from six.moves.urllib.parse import parse_qs, urlparse
import xmltodict
from moto.core.responses import _TemplateEnvironmentMixin
from .exceptions import BucketAlreadyExists, S3ClientError, InvalidPartOrder
from .models import s3_backend, get_canned_acl, FakeGrantee, FakeGrant, FakeAcl
from .utils import bucket_name_from_url, metadata_from_headers
from xml.dom import minidom
REGION_URL_REGEX = r'\.s3-(.+?)\.amazonaws\.com'
DEFAULT_REGION_NAME = 'us-east-1'
def parse_key_name(pth):
return pth.lstrip("/")
class ResponseObject(_TemplateEnvironmentMixin):
def __init__(self, backend, bucket_name_from_url, parse_key_name,
is_delete_keys=None):
super(ResponseObject, self).__init__()
self.backend = backend
self.bucket_name_from_url = bucket_name_from_url
self.parse_key_name = parse_key_name
if is_delete_keys:
self.is_delete_keys = is_delete_keys
@staticmethod
def is_delete_keys(path, bucket_name):
return path == u'/?delete'
def all_buckets(self):
# No bucket specified. Listing all buckets
all_buckets = self.backend.get_all_buckets()
template = self.response_template(S3_ALL_BUCKETS)
return template.render(buckets=all_buckets)
def bucket_response(self, request, full_url, headers):
try:
response = self._bucket_response(request, full_url, headers)
except S3ClientError as s3error:
response = s3error.code, headers, s3error.description
if isinstance(response, six.string_types):
return 200, headers, response.encode("utf-8")
else:
status_code, headers, response_content = response
return status_code, headers, response_content.encode("utf-8")
def _bucket_response(self, request, full_url, headers):
parsed_url = urlparse(full_url)
querystring = parse_qs(parsed_url.query, keep_blank_values=True)
method = request.method
region_name = DEFAULT_REGION_NAME
region_match = re.search(REGION_URL_REGEX, full_url)
if region_match:
region_name = region_match.groups()[0]
bucket_name = self.bucket_name_from_url(full_url)
if not bucket_name:
# If no bucket specified, list all buckets
return self.all_buckets()
if hasattr(request, 'body'):
# Boto
body = request.body
else:
# Flask server
body = request.data
body = body.decode('utf-8')
if method == 'HEAD':
return self._bucket_response_head(bucket_name, headers)
elif method == 'GET':
return self._bucket_response_get(bucket_name, querystring, headers)
elif method == 'PUT':
return self._bucket_response_put(request, body, region_name, bucket_name, querystring, headers)
elif method == 'DELETE':
return self._bucket_response_delete(body, bucket_name, querystring, headers)
elif method == 'POST':
return self._bucket_response_post(request, bucket_name, headers)
else:
raise NotImplementedError("Method {0} has not been impelemented in the S3 backend yet".format(method))
def _bucket_response_head(self, bucket_name, headers):
self.backend.get_bucket(bucket_name)
return 200, headers, ""
def _bucket_response_get(self, bucket_name, querystring, headers):
if 'uploads' in querystring:
for unsup in ('delimiter', 'max-uploads'):
if unsup in querystring:
raise NotImplementedError("Listing multipart uploads with {} has not been implemented yet.".format(unsup))
multiparts = list(self.backend.get_all_multiparts(bucket_name).values())
if 'prefix' in querystring:
prefix = querystring.get('prefix', [None])[0]
multiparts = [upload for upload in multiparts if upload.key_name.startswith(prefix)]
template = self.response_template(S3_ALL_MULTIPARTS)
return template.render(
bucket_name=bucket_name,
uploads=multiparts)
elif 'location' in querystring:
bucket = self.backend.get_bucket(bucket_name)
template = self.response_template(S3_BUCKET_LOCATION)
return template.render(location=bucket.location)
elif 'lifecycle' in querystring:
bucket = self.backend.get_bucket(bucket_name)
if not bucket.rules:
return 404, headers, "NoSuchLifecycleConfiguration"
template = self.response_template(S3_BUCKET_LIFECYCLE_CONFIGURATION)
return template.render(rules=bucket.rules)
elif 'versioning' in querystring:
versioning = self.backend.get_bucket_versioning(bucket_name)
template = self.response_template(S3_BUCKET_GET_VERSIONING)
return template.render(status=versioning)
elif 'policy' in querystring:
policy = self.backend.get_bucket_policy(bucket_name)
if not policy:
template = self.response_template(S3_NO_POLICY)
return 404, headers, template.render(bucket_name=bucket_name)
return 200, headers, policy
elif 'website' in querystring:
website_configuration = self.backend.get_bucket_website_configuration(bucket_name)
return website_configuration
elif 'acl' in querystring:
bucket = self.backend.get_bucket(bucket_name)
template = self.response_template(S3_OBJECT_ACL_RESPONSE)
return template.render(obj=bucket)
elif 'versions' in querystring:
delimiter = querystring.get('delimiter', [None])[0]
encoding_type = querystring.get('encoding-type', [None])[0]
key_marker = querystring.get('key-marker', [None])[0]
max_keys = querystring.get('max-keys', [None])[0]
prefix = querystring.get('prefix', [None])[0]
version_id_marker = querystring.get('version-id-marker', [None])[0]
bucket = self.backend.get_bucket(bucket_name)
versions = self.backend.get_bucket_versions(
bucket_name,
delimiter=delimiter,
encoding_type=encoding_type,
key_marker=key_marker,
max_keys=max_keys,
version_id_marker=version_id_marker
)
template = self.response_template(S3_BUCKET_GET_VERSIONS)
return 200, headers, template.render(
key_list=versions,
bucket=bucket,
prefix='',
max_keys='',
delimiter='',
is_truncated='false',
)
bucket = self.backend.get_bucket(bucket_name)
prefix = querystring.get('prefix', [None])[0]
delimiter = querystring.get('delimiter', [None])[0]
result_keys, result_folders = self.backend.prefix_query(bucket, prefix, delimiter)
template = self.response_template(S3_BUCKET_GET_RESPONSE)
return 200, headers, template.render(
bucket=bucket,
prefix=prefix,
delimiter=delimiter,
result_keys=result_keys,
result_folders=result_folders
)
def _bucket_response_put(self, request, body, region_name, bucket_name, querystring, headers):
if 'versioning' in querystring:
ver = re.search('([A-Za-z]+)', body)
if ver:
self.backend.set_bucket_versioning(bucket_name, ver.group(1))
template = self.response_template(S3_BUCKET_VERSIONING)
return template.render(bucket_versioning_status=ver.group(1))
else:
return 404, headers, ""
elif 'lifecycle' in querystring:
rules = xmltodict.parse(body)['LifecycleConfiguration']['Rule']
if not isinstance(rules, list):
# If there is only one rule, xmldict returns just the item
rules = [rules]
self.backend.set_bucket_lifecycle(bucket_name, rules)
return ""
elif 'policy' in querystring:
self.backend.set_bucket_policy(bucket_name, body)
return 'True'
elif 'acl' in querystring:
acl = self._acl_from_headers(request.headers)
# TODO: Support the XML-based ACL format
self.backend.set_bucket_acl(bucket_name, acl)
return ""
elif 'website' in querystring:
self.backend.set_bucket_website_configuration(bucket_name, body)
return ""
else:
try:
new_bucket = self.backend.create_bucket(bucket_name, region_name)
except BucketAlreadyExists:
if region_name == DEFAULT_REGION_NAME:
# us-east-1 has different behavior
new_bucket = self.backend.get_bucket(bucket_name)
else:
raise
template = self.response_template(S3_BUCKET_CREATE_RESPONSE)
return 200, headers, template.render(bucket=new_bucket)
def _bucket_response_delete(self, body, bucket_name, querystring, headers):
if 'policy' in querystring:
self.backend.delete_bucket_policy(bucket_name, body)
return 204, headers, ""
elif 'lifecycle' in querystring:
bucket = self.backend.get_bucket(bucket_name)
bucket.delete_lifecycle()
return 204, headers, ""
removed_bucket = self.backend.delete_bucket(bucket_name)
if removed_bucket:
# Bucket exists
template = self.response_template(S3_DELETE_BUCKET_SUCCESS)
return 204, headers, template.render(bucket=removed_bucket)
else:
# Tried to delete a bucket that still has keys
template = self.response_template(S3_DELETE_BUCKET_WITH_ITEMS_ERROR)
return 409, headers, template.render(bucket=removed_bucket)
def _bucket_response_post(self, request, bucket_name, headers):
if self.is_delete_keys(request.path, bucket_name):
return self._bucket_response_delete_keys(request, bucket_name, headers)
# POST to bucket-url should create file from form
if hasattr(request, 'form'):
# Not HTTPretty
form = request.form
else:
# HTTPretty, build new form object
form = {}
for kv in request.body.decode('utf-8').split('&'):
k, v = kv.split('=')
form[k] = v
key = form['key']
if 'file' in form:
f = form['file']
else:
f = request.files['file'].stream.read()
new_key = self.backend.set_key(bucket_name, key, f)
# Metadata
metadata = metadata_from_headers(form)
new_key.set_metadata(metadata)
return 200, headers, ""
def _bucket_response_delete_keys(self, request, bucket_name, headers):
template = self.response_template(S3_DELETE_KEYS_RESPONSE)
keys = minidom.parseString(request.body.decode('utf-8')).getElementsByTagName('Key')
deleted_names = []
error_names = []
for k in keys:
try:
key_name = k.firstChild.nodeValue
self.backend.delete_key(bucket_name, key_name)
deleted_names.append(key_name)
except KeyError:
error_names.append(key_name)
return 200, headers, template.render(deleted=deleted_names, delete_errors=error_names)
def _handle_range_header(self, request, headers, response_content):
length = len(response_content)
last = length - 1
_, rspec = request.headers.get('range').split('=')
if ',' in rspec:
raise NotImplementedError(
"Multiple range specifiers not supported")
toint = lambda i: int(i) if i else None
begin, end = map(toint, rspec.split('-'))
if begin is not None: # byte range
end = last if end is None else min(end, last)
elif end is not None: # suffix byte range
begin = length - min(end, length)
end = last
else:
return 400, headers, ""
if begin < 0 or end > last or begin > min(end, last):
return 416, headers, ""
headers['content-range'] = "bytes {0}-{1}/{2}".format(
begin, end, length)
return 206, headers, response_content[begin:end + 1]
def key_response(self, request, full_url, headers):
try:
response = self._key_response(request, full_url, headers)
except S3ClientError as s3error:
response = s3error.code, headers, s3error.description
if isinstance(response, six.string_types):
status_code = 200
response_content = response
else:
status_code, headers, response_content = response
if status_code == 200 and 'range' in request.headers:
return self._handle_range_header(request, headers, response_content)
return status_code, headers, response_content
def _key_response(self, request, full_url, headers):
parsed_url = urlparse(full_url)
query = parse_qs(parsed_url.query, keep_blank_values=True)
method = request.method
key_name = self.parse_key_name(parsed_url.path)
bucket_name = self.bucket_name_from_url(full_url)
if hasattr(request, 'body'):
# Boto
body = request.body
else:
# Flask server
body = request.data
if method == 'GET':
return self._key_response_get(bucket_name, query, key_name, headers)
elif method == 'PUT':
return self._key_response_put(request, body, bucket_name, query, key_name, headers)
elif method == 'HEAD':
return self._key_response_head(bucket_name, key_name, headers)
elif method == 'DELETE':
return self._key_response_delete(bucket_name, query, key_name, headers)
elif method == 'POST':
return self._key_response_post(request, body, bucket_name, query, key_name, headers)
else:
raise NotImplementedError("Method {0} has not been impelemented in the S3 backend yet".format(method))
def _key_response_get(self, bucket_name, query, key_name, headers):
if query.get('uploadId'):
upload_id = query['uploadId'][0]
parts = self.backend.list_multipart(bucket_name, upload_id)
template = self.response_template(S3_MULTIPART_LIST_RESPONSE)
return 200, headers, template.render(
bucket_name=bucket_name,
key_name=key_name,
upload_id=upload_id,
count=len(parts),
parts=parts
)
version_id = query.get('versionId', [None])[0]
key = self.backend.get_key(
bucket_name, key_name, version_id=version_id)
if 'acl' in query:
template = self.response_template(S3_OBJECT_ACL_RESPONSE)
return 200, headers, template.render(obj=key)
if key:
headers.update(key.metadata)
return 200, headers, key.value
else:
return 404, headers, ""
def _key_response_put(self, request, body, bucket_name, query, key_name, headers):
if query.get('uploadId') and query.get('partNumber'):
upload_id = query['uploadId'][0]
part_number = int(query['partNumber'][0])
if 'x-amz-copy-source' in request.headers:
src = request.headers.get("x-amz-copy-source")
src_bucket, src_key = src.split("/", 1)
key = self.backend.copy_part(
bucket_name, upload_id, part_number, src_bucket,
src_key)
template = self.response_template(S3_MULTIPART_UPLOAD_RESPONSE)
response = template.render(part=key)
else:
key = self.backend.set_part(
bucket_name, upload_id, part_number, body)
response = ""
headers.update(key.response_dict)
return 200, headers, response
storage_class = request.headers.get('x-amz-storage-class', 'STANDARD')
acl = self._acl_from_headers(request.headers)
if 'acl' in query:
key = self.backend.get_key(bucket_name, key_name)
# TODO: Support the XML-based ACL format
key.set_acl(acl)
return 200, headers, ""
if 'x-amz-copy-source' in request.headers:
# Copy key
src_bucket, src_key = request.headers.get("x-amz-copy-source").split("/", 1)
self.backend.copy_key(src_bucket, src_key, bucket_name, key_name,
storage=storage_class, acl=acl)
mdirective = request.headers.get('x-amz-metadata-directive')
if mdirective is not None and mdirective == 'REPLACE':
new_key = self.backend.get_key(bucket_name, key_name)
metadata = metadata_from_headers(request.headers)
new_key.set_metadata(metadata, replace=True)
template = self.response_template(S3_OBJECT_COPY_RESPONSE)
return template.render(key=src_key)
streaming_request = hasattr(request, 'streaming') and request.streaming
closing_connection = headers.get('connection') == 'close'
if closing_connection and streaming_request:
# Closing the connection of a streaming request. No more data
new_key = self.backend.get_key(bucket_name, key_name)
elif streaming_request:
# Streaming request, more data
new_key = self.backend.append_to_key(bucket_name, key_name, body)
else:
# Initial data
new_key = self.backend.set_key(bucket_name, key_name, body,
storage=storage_class)
request.streaming = True
metadata = metadata_from_headers(request.headers)
new_key.set_metadata(metadata)
new_key.set_acl(acl)
template = self.response_template(S3_OBJECT_RESPONSE)
headers.update(new_key.response_dict)
return 200, headers, template.render(key=new_key)
def _key_response_head(self, bucket_name, key_name, headers):
key = self.backend.get_key(bucket_name, key_name)
if key:
headers.update(key.metadata)
headers.update(key.response_dict)
return 200, headers, key.value
else:
return 404, headers, ""
def _acl_from_headers(self, headers):
canned_acl = headers.get('x-amz-acl', '')
if canned_acl:
return get_canned_acl(canned_acl)
grants = []
for header, value in headers.items():
if not header.startswith('x-amz-grant-'):
continue
permission = {
'read': 'READ',
'write': 'WRITE',
'read-acp': 'READ_ACP',
'write-acp': 'WRITE_ACP',
'full-control': 'FULL_CONTROL',
}[header[len('x-amz-grant-'):]]
grantees = []
for key_and_value in value.split(","):
key, value = re.match('([^=]+)="([^"]+)"', key_and_value.strip()).groups()
if key.lower() == 'id':
grantees.append(FakeGrantee(id=value))
else:
grantees.append(FakeGrantee(uri=value))
grants.append(FakeGrant(grantees, [permission]))
if grants:
return FakeAcl(grants)
else:
return None
def _key_response_delete(self, bucket_name, query, key_name, headers):
if query.get('uploadId'):
upload_id = query['uploadId'][0]
self.backend.cancel_multipart(bucket_name, upload_id)
return 204, headers, ""
try:
removed_key = self.backend.delete_key(bucket_name, key_name)
except KeyError:
return 404, headers, ""
template = self.response_template(S3_DELETE_OBJECT_SUCCESS)
return 204, headers, template.render(bucket=removed_key)
def _complete_multipart_body(self, body):
ps = minidom.parseString(body).getElementsByTagName('Part')
prev = 0
for p in ps:
pn = int(p.getElementsByTagName('PartNumber')[0].firstChild.wholeText)
if pn <= prev:
raise InvalidPartOrder()
yield (pn, p.getElementsByTagName('ETag')[0].firstChild.wholeText)
def _key_response_post(self, request, body, bucket_name, query, key_name, headers):
if body == b'' and 'uploads' in query:
metadata = metadata_from_headers(request.headers)
multipart = self.backend.initiate_multipart(bucket_name, key_name, metadata)
template = self.response_template(S3_MULTIPART_INITIATE_RESPONSE)
response = template.render(
bucket_name=bucket_name,
key_name=key_name,
upload_id=multipart.id,
)
return 200, headers, response
if query.get('uploadId'):
body = self._complete_multipart_body(body)
upload_id = query['uploadId'][0]
key = self.backend.complete_multipart(bucket_name, upload_id, body)
template = self.response_template(S3_MULTIPART_COMPLETE_RESPONSE)
return template.render(
bucket_name=bucket_name,
key_name=key.name,
etag=key.etag,
)
elif 'restore' in query:
es = minidom.parseString(body).getElementsByTagName('Days')
days = es[0].childNodes[0].wholeText
key = self.backend.get_key(bucket_name, key_name)
r = 202
if key.expiry_date is not None:
r = 200
key.restore(int(days))
return r, headers, ""
else:
raise NotImplementedError("Method POST had only been implemented for multipart uploads and restore operations, so far")
S3ResponseInstance = ResponseObject(s3_backend, bucket_name_from_url, parse_key_name)
S3_ALL_BUCKETS = """
bcaf1ffd86f41161ca5fb16fd081034f
webfile
{% for bucket in buckets %}
{{ bucket.name }}
2006-02-03T16:45:09.000Z
{% endfor %}
"""
S3_BUCKET_GET_RESPONSE = """
{{ bucket.name }}
{{ prefix }}
1000
{{ delimiter }}
false
{% for key in result_keys %}
{{ key.name }}
{{ key.last_modified_ISO8601 }}
{{ key.etag }}
{{ key.size }}
{{ key.storage_class }}
75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a
webfile
{% endfor %}
{% if delimiter %}
{% for folder in result_folders %}
{{ folder }}
{% endfor %}
{% endif %}
"""
S3_BUCKET_CREATE_RESPONSE = """
{{ bucket.name }}
"""
S3_DELETE_BUCKET_SUCCESS = """
204
No Content
"""
S3_DELETE_BUCKET_WITH_ITEMS_ERROR = """
BucketNotEmpty
The bucket you tried to delete is not empty
{{ bucket.name }}
asdfasdfsdafds
sdfgdsfgdsfgdfsdsfgdfs
"""
S3_BUCKET_LOCATION = """
{{ location }}"""
S3_BUCKET_LIFECYCLE_CONFIGURATION = """
{% for rule in rules %}
{{ rule.id }}
{{ rule.prefix if rule.prefix != None }}
{{ rule.status }}
{% if rule.storage_class %}
{% if rule.transition_days %}
{{ rule.transition_days }}
{% endif %}
{% if rule.transition_date %}
{{ rule.transition_date }}
{% endif %}
{{ rule.storage_class }}
{% endif %}
{% if rule.expiration_days or rule.expiration_date %}
{% if rule.expiration_days %}
{{ rule.expiration_days }}
{% endif %}
{% if rule.expiration_date %}
{{ rule.expiration_date }}
{% endif %}
{% endif %}
{% endfor %}
"""
S3_BUCKET_VERSIONING = """
{{ bucket_versioning_status }}
"""
S3_BUCKET_GET_VERSIONING = """
{% if status is none %}
{% else %}
{{ status }}
{% endif %}
"""
S3_BUCKET_GET_VERSIONS = """
{{ bucket.name }}
{{ prefix }}
{{ key_marker }}
{{ max_keys }}
{{ is_truncated }}
{% for key in key_list %}
{{ key.name }}
{{ key._version_id }}
false
{{ key.last_modified_ISO8601 }}
{{ key.etag }}
{{ key.size }}
{{ key.storage_class }}
75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a
webfile
{% endfor %}
"""
S3_DELETE_KEYS_RESPONSE = """
{% for k in deleted %}
{{k}}
{% endfor %}
{% for k in delete_errors %}
{{k}}
{% endfor %}
"""
S3_DELETE_OBJECT_SUCCESS = """
200
OK
"""
S3_OBJECT_RESPONSE = """
{{ key.etag }}
{{ key.last_modified_ISO8601 }}
"""
S3_OBJECT_ACL_RESPONSE = """
75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a
webfile
{% for grant in obj.acl.grants %}
{% for grantee in grant.grantees %}
{% if grantee.uri %}
{{ grantee.uri }}
{% endif %}
{% if grantee.id %}
{{ grantee.id }}
{% endif %}
{% if grantee.display_name %}
{{ grantee.display_name }}
{% endif %}
{% endfor %}
{% for permission in grant.permissions %}
{{ permission }}
{% endfor %}
{% endfor %}
"""
S3_OBJECT_COPY_RESPONSE = """
{{ key.etag }}
{{ key.last_modified_ISO8601 }}
"""
S3_MULTIPART_INITIATE_RESPONSE = """
{{ bucket_name }}
{{ key_name }}
{{ upload_id }}
"""
S3_MULTIPART_UPLOAD_RESPONSE = """
{{ part.last_modified_ISO8601 }}
{{ part.etag }}
"""
S3_MULTIPART_LIST_RESPONSE = """
{{ bucket_name }}
{{ key_name }}
{{ upload_id }}
STANDARD
75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a
webfile
75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a
webfile
STANDARD
1
{{ count }}
{{ count }}
false
{% for part in parts %}
{{ part.name }}
{{ part.last_modified_ISO8601 }}
{{ part.etag }}
{{ part.size }}
{% endfor %}
"""
S3_MULTIPART_COMPLETE_RESPONSE = """
http://{{ bucket_name }}.s3.amazonaws.com/{{ key_name }}
{{ bucket_name }}
{{ key_name }}
{{ etag }}
"""
S3_ALL_MULTIPARTS = """
{{ bucket_name }}
1000
False
{% for upload in uploads %}
{{ upload.key_name }}
{{ upload.id }}
arn:aws:iam::111122223333:user/user1-11111a31-17b5-4fb7-9df5-b111111f13de
user1-11111a31-17b5-4fb7-9df5-b111111f13de
75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a
webfile
STANDARD
2010-11-10T20:48:33.000Z
{% endfor %}
"""
S3_NO_POLICY = """
NoSuchBucketPolicy
The bucket policy does not exist
{{ bucket_name }}
0D68A23BB2E2215B
9Gjjt1m+cjU4OPvX9O9/8RuvnG41MRb/18Oux2o5H5MY7ISNTlXN+Dz9IG62/ILVxhAGI0qyPfg=
"""