Merge pull request #64 from kouk/master

Multipart upload support
This commit is contained in:
Steve Pulec 2013-11-23 07:26:04 -08:00
commit 4853954c55
3 changed files with 212 additions and 1 deletions

View File

@ -1,3 +1,5 @@
import os
import base64
import datetime import datetime
import hashlib import hashlib
@ -5,6 +7,9 @@ from moto.core import BaseBackend
from moto.core.utils import iso_8601_datetime, rfc_1123_datetime from moto.core.utils import iso_8601_datetime, rfc_1123_datetime
from .utils import clean_key_name from .utils import clean_key_name
UPLOAD_ID_BYTES=43
UPLOAD_PART_MIN_SIZE=5242880
class FakeKey(object): class FakeKey(object):
def __init__(self, name, value): def __init__(self, name, value):
@ -23,7 +28,7 @@ class FakeKey(object):
@property @property
def etag(self): def etag(self):
value_md5 = hashlib.md5() value_md5 = hashlib.md5()
value_md5.update(self.value) value_md5.update(bytes(self.value))
return '"{0}"'.format(value_md5.hexdigest()) return '"{0}"'.format(value_md5.hexdigest())
@property @property
@ -52,10 +57,48 @@ class FakeKey(object):
return len(self.value) return len(self.value)
class FakeMultipart(object):
def __init__(self, key_name):
self.key_name = key_name
self.parts = {}
self.id = base64.b64encode(os.urandom(UPLOAD_ID_BYTES)).replace('=', '').replace('+', '')
def complete(self):
total = bytearray()
last_part_name = len(self.list_parts())
for part in self.list_parts():
if part.name != last_part_name and len(part.value) < UPLOAD_PART_MIN_SIZE:
return
total.extend(part.value)
return total
def set_part(self, part_id, value):
if part_id < 1:
return
key = FakeKey(part_id, value)
self.parts[part_id] = key
return key
def list_parts(self):
parts = []
for part_id, index in enumerate(sorted(self.parts.keys()), start=1):
# Make sure part ids are continuous
if part_id != index:
return
parts.append(self.parts[part_id])
return parts
class FakeBucket(object): class FakeBucket(object):
def __init__(self, name): def __init__(self, name):
self.name = name self.name = name
self.keys = {} self.keys = {}
self.multiparts = {}
class S3Backend(BaseBackend): class S3Backend(BaseBackend):
@ -106,6 +149,36 @@ class S3Backend(BaseBackend):
if bucket: if bucket:
return bucket.keys.get(key_name) return bucket.keys.get(key_name)
def initiate_multipart(self, bucket_name, key_name):
bucket = self.buckets[bucket_name]
new_multipart = FakeMultipart(key_name)
bucket.multiparts[new_multipart.id] = new_multipart
return new_multipart
def complete_multipart(self, bucket_name, multipart_id):
bucket = self.buckets[bucket_name]
multipart = bucket.multiparts[multipart_id]
value = multipart.complete()
if value is None:
return
del bucket.multiparts[multipart_id]
return self.set_key(bucket_name, multipart.key_name, value)
def cancel_multipart(self, bucket_name, multipart_id):
bucket = self.buckets[bucket_name]
del bucket.multiparts[multipart_id]
def list_multipart(self, bucket_name, multipart_id):
bucket = self.buckets[bucket_name]
return bucket.multiparts[multipart_id].list_parts()
def set_part(self, bucket_name, multipart_id, part_id, value):
bucket = self.buckets[bucket_name]
multipart = bucket.multiparts[multipart_id]
return multipart.set_part(part_id, value)
def prefix_query(self, bucket, prefix, delimiter): def prefix_query(self, bucket, prefix, delimiter):
key_results = set() key_results = set()
folder_results = set() folder_results = set()

View File

@ -116,6 +116,7 @@ class ResponseObject(object):
def _key_response(self, request, full_url, headers): def _key_response(self, request, full_url, headers):
parsed_url = urlparse(full_url) parsed_url = urlparse(full_url)
query = parse_qs(parsed_url.query)
method = request.method method = request.method
key_name = self.parse_key_name(parsed_url.path) key_name = self.parse_key_name(parsed_url.path)
@ -130,6 +131,17 @@ class ResponseObject(object):
body = request.data body = request.data
if method == 'GET': if method == 'GET':
if 'uploadId' in query:
upload_id = query['uploadId'][0]
parts = self.backend.list_multipart(bucket_name, upload_id)
template = Template(S3_MULTIPART_LIST_RESPONSE)
return 200, headers, template.render(
bucket_name=bucket_name,
key_name=key_name,
upload_id=upload_id,
count=len(parts),
parts=parts
)
key = self.backend.get_key(bucket_name, key_name) key = self.backend.get_key(bucket_name, key_name)
if key: if key:
headers.update(key.metadata) headers.update(key.metadata)
@ -137,6 +149,14 @@ class ResponseObject(object):
else: else:
return 404, headers, "" return 404, headers, ""
if method == 'PUT': if method == 'PUT':
if 'uploadId' in query and 'partNumber' in query and body:
upload_id = query['uploadId'][0]
part_number = int(query['partNumber'][0])
key = self.backend.set_part(bucket_name, upload_id, part_number, body)
template = Template(S3_MULTIPART_UPLOAD_RESPONSE)
headers.update(key.response_dict)
return 200, headers, template.render(part=key)
if 'x-amz-copy-source' in request.headers: if 'x-amz-copy-source' in request.headers:
# Copy key # Copy key
src_bucket, src_key = request.headers.get("x-amz-copy-source").split("/", 1) src_bucket, src_key = request.headers.get("x-amz-copy-source").split("/", 1)
@ -177,9 +197,39 @@ class ResponseObject(object):
else: else:
return 404, headers, "" return 404, headers, ""
elif method == 'DELETE': elif method == 'DELETE':
if 'uploadId' in query:
upload_id = query['uploadId'][0]
self.backend.cancel_multipart(bucket_name, upload_id)
return 204, headers, ""
removed_key = self.backend.delete_key(bucket_name, key_name) removed_key = self.backend.delete_key(bucket_name, key_name)
template = Template(S3_DELETE_OBJECT_SUCCESS) template = Template(S3_DELETE_OBJECT_SUCCESS)
return 204, headers, template.render(bucket=removed_key) return 204, headers, template.render(bucket=removed_key)
elif method == 'POST':
if body == '' and parsed_url.query == 'uploads':
multipart = self.backend.initiate_multipart(bucket_name, key_name)
template = Template(S3_MULTIPART_INITIATE_RESPONSE)
response = template.render(
bucket_name=bucket_name,
key_name=key_name,
upload_id=multipart.id,
)
return 200, headers, response
if 'uploadId' in query:
upload_id = query['uploadId'][0]
key = self.backend.complete_multipart(bucket_name, upload_id)
if key is not None:
template = Template(S3_MULTIPART_COMPLETE_RESPONSE)
return template.render(
bucket_name=bucket_name,
key_name=key.name,
etag=key.etag,
)
template = Template(S3_MULTIPART_COMPLETE_TOO_SMALL_ERROR)
return 400, headers, template.render()
else:
raise NotImplementedError("Method POST had only been implemented for multipart uploads so far")
else: else:
raise NotImplementedError("Method {0} has not been impelemented in the S3 backend yet".format(method)) raise NotImplementedError("Method {0} has not been impelemented in the S3 backend yet".format(method))
@ -279,3 +329,62 @@ S3_OBJECT_COPY_RESPONSE = """<CopyObjectResponse xmlns="http://doc.s3.amazonaws.
<LastModified>{{ key.last_modified_ISO8601 }}</LastModified> <LastModified>{{ key.last_modified_ISO8601 }}</LastModified>
</CopyObjectResponse> </CopyObjectResponse>
</CopyObjectResponse>""" </CopyObjectResponse>"""
S3_MULTIPART_INITIATE_RESPONSE = """<?xml version="1.0" encoding="UTF-8"?>
<InitiateMultipartUploadResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
<Bucket>{{ bucket_name }}</Bucket>
<Key>{{ key_name }}</Key>
<UploadId>{{ upload_id }}</UploadId>
</InitiateMultipartUploadResult>"""
S3_MULTIPART_UPLOAD_RESPONSE = """<?xml version="1.0" encoding="UTF-8"?>
<CopyPartResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
<LastModified>{{ part.last_modified_ISO8601 }}</LastModified>
<ETag>{{ part.etag }}</ETag>
</CopyPartResult>"""
S3_MULTIPART_LIST_RESPONSE = """<?xml version="1.0" encoding="UTF-8"?>
<ListPartsResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
<Bucket>{{ bucket_name }}</Bucket>
<Key>{{ key_name }}</Key>
<UploadId>{{ upload_id }}</UploadId>
<StorageClass>STANDARD</StorageClass>
<Initiator>
<ID>75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a</ID>
<DisplayName>webfile</DisplayName>
</Initiator>
<Owner>
<ID>75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a</ID>
<DisplayName>webfile</DisplayName>
</Owner>
<StorageClass>STANDARD</StorageClass>
<PartNumberMarker>1</PartNumberMarker>
<NextPartNumberMarker>{{ count }} </NextPartNumberMarker>
<MaxParts>{{ count }}</MaxParts>
<IsTruncated>false</IsTruncated>
{% for part in parts %}
<Part>
<PartNumber>{{ part.name }}</PartNumber>
<LastModified>{{ part.last_modified_ISO8601 }}</LastModified>
<ETag>{{ part.etag }}</ETag>
<Size>{{ part.size }}</Size>
</Part>
{% endfor %}
</ListPartsResult>"""
S3_MULTIPART_COMPLETE_RESPONSE = """<?xml version="1.0" encoding="UTF-8"?>
<CompleteMultipartUploadResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
<Location>http://{{ bucket_name }}.s3.amazonaws.com/{{ key_name }}</Location>
<Bucket>{{ bucket_name }}</Bucket>
<Key>{{ key_name }}</Key>
<ETag>{{ etag }}</ETag>
</CompleteMultipartUploadResult>
"""
S3_MULTIPART_COMPLETE_TOO_SMALL_ERROR = """<?xml version="1.0" encoding="UTF-8"?>
<Error>
<Code>EntityTooSmall</Code>
<Message>Your proposed upload is smaller than the minimum allowed object size.</Message>
<RequestId>asdfasdfsdafds</RequestId>
<HostId>sdfgdsfgdsfgdfsdsfgdfs</HostId>
</Error>"""

View File

@ -1,4 +1,5 @@
import urllib2 import urllib2
from io import BytesIO
import boto import boto
from boto.exception import S3ResponseError from boto.exception import S3ResponseError
@ -37,6 +38,34 @@ def test_my_model_save():
conn.get_bucket('mybucket').get_key('steve').get_contents_as_string().should.equal('is awesome') conn.get_bucket('mybucket').get_key('steve').get_contents_as_string().should.equal('is awesome')
@mock_s3
def test_multipart_upload_too_small():
conn = boto.connect_s3('the_key', 'the_secret')
bucket = conn.create_bucket("foobar")
multipart = bucket.initiate_multipart_upload("the-key")
multipart.upload_part_from_file(BytesIO('hello'), 1)
multipart.upload_part_from_file(BytesIO('world'), 2)
# Multipart with total size under 5MB is refused
multipart.complete_upload.should.throw(S3ResponseError)
@mock_s3
def test_multipart_upload():
conn = boto.connect_s3('the_key', 'the_secret')
bucket = conn.create_bucket("foobar")
multipart = bucket.initiate_multipart_upload("the-key")
part1 = '0' * 5242880
multipart.upload_part_from_file(BytesIO(part1), 1)
# last part, can be less than 5 MB
part2 = '1'
multipart.upload_part_from_file(BytesIO(part2), 2)
multipart.complete_upload()
# we should get both parts as the key contents
bucket.get_key("the-key").get_contents_as_string().should.equal(part1 + part2)
@mock_s3 @mock_s3
def test_missing_key(): def test_missing_key():
conn = boto.connect_s3('the_key', 'the_secret') conn = boto.connect_s3('the_key', 'the_secret')