Merge pull request #297 from kouk/multipart_errors

Support for client supplied part order in multipart uploads
This commit is contained in:
Steve Pulec 2015-02-14 08:30:10 -05:00
commit 4d4cb39769
6 changed files with 213 additions and 99 deletions

28
moto/core/exceptions.py Normal file
View File

@ -0,0 +1,28 @@
from werkzeug.exceptions import HTTPException
from jinja2 import DictLoader, Environment
ERROR_RESPONSE = u"""<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Errors>
<Error>
<Code>{{code}}</Code>
<Message>{{message}}</Message>
{% block extra %}{% endblock %}
</Error>
</Errors>
<RequestID>7a62c49f-347e-4fc4-9331-6e8eEXAMPLE</RequestID>
</Response>
"""
class RESTError(HTTPException):
templates = {
'error': ERROR_RESPONSE
}
def __init__(self, code, message, template='error', **kwargs):
super(RESTError, self).__init__()
env = Environment(loader=DictLoader(self.templates))
self.description = env.get_template(template).render(
code=code, message=message, **kwargs)

View File

@ -1,13 +1,9 @@
from __future__ import unicode_literals
from werkzeug.exceptions import BadRequest
from jinja2 import Template
from moto.core.exceptions import RESTError
class EC2ClientError(BadRequest):
def __init__(self, code, message):
super(EC2ClientError, self).__init__()
self.description = ERROR_RESPONSE_TEMPLATE.render(
code=code, message=message)
class EC2ClientError(RESTError):
code = 400
class DependencyViolationError(EC2ClientError):
@ -306,17 +302,3 @@ class InvalidCIDRSubnetError(EC2ClientError):
"InvalidParameterValue",
"invalid CIDR subnet specification: {0}"
.format(cidr))
ERROR_RESPONSE = u"""<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Errors>
<Error>
<Code>{{code}}</Code>
<Message>{{message}}</Message>
</Error>
</Errors>
<RequestID>7a62c49f-347e-4fc4-9331-6e8eEXAMPLE</RequestID>
</Response>
"""
ERROR_RESPONSE_TEMPLATE = Template(ERROR_RESPONSE)

View File

@ -1,9 +1,73 @@
from __future__ import unicode_literals
from moto.core.exceptions import RESTError
class BucketAlreadyExists(Exception):
ERROR_WITH_BUCKET_NAME = """{% extends 'error' %}
{% block extra %}<BucketName>{{ bucket }}</BucketName>{% endblock %}
"""
class S3ClientError(RESTError):
pass
class MissingBucket(Exception):
pass
class BucketError(S3ClientError):
def __init__(self, *args, **kwargs):
kwargs.setdefault('template', 'bucket_error')
self.templates['bucket_error'] = ERROR_WITH_BUCKET_NAME
super(BucketError, self).__init__(*args, **kwargs)
class BucketAlreadyExists(BucketError):
code = 409
def __init__(self, *args, **kwargs):
super(BucketAlreadyExists, self).__init__(
"BucketAlreadyExists",
("The requested bucket name is not available. The bucket "
"namespace is shared by all users of the system. Please "
"select a different name and try again"),
*args, **kwargs)
class MissingBucket(BucketError):
code = 404
def __init__(self, *args, **kwargs):
super(MissingBucket, self).__init__(
"NoSuchBucket",
"The specified bucket does not exist",
*args, **kwargs)
class InvalidPartOrder(S3ClientError):
code = 400
def __init__(self, *args, **kwargs):
super(InvalidPartOrder, self).__init__(
"InvalidPartOrder",
("The list of parts was not in ascending order. The parts "
"list must be specified in order by part number."),
*args, **kwargs)
class InvalidPart(S3ClientError):
code = 400
def __init__(self, *args, **kwargs):
super(InvalidPart, self).__init__(
"InvalidPart",
("One or more of the specified parts could not be found. "
"The part might not have been uploaded, or the specified "
"entity tag might not have matched the part's entity tag."),
*args, **kwargs)
class EntityTooSmall(S3ClientError):
code = 400
def __init__(self, *args, **kwargs):
super(EntityTooSmall, self).__init__(
"EntityTooSmall",
"Your proposed upload is smaller than the minimum allowed object size.",
*args, **kwargs)

View File

@ -8,9 +8,10 @@ import itertools
import codecs
import six
from bisect import insort
from moto.core import BaseBackend
from moto.core.utils import iso_8601_datetime_with_milliseconds, rfc_1123_datetime
from .exceptions import BucketAlreadyExists, MissingBucket
from .exceptions import BucketAlreadyExists, MissingBucket, InvalidPart, EntityTooSmall
from .utils import clean_key_name, _VersionedKeyStore
UPLOAD_ID_BYTES = 43
@ -118,25 +119,32 @@ class FakeMultipart(object):
self.key_name = key_name
self.metadata = metadata
self.parts = {}
self.partlist = [] # ordered list of part ID's
rand_b64 = base64.b64encode(os.urandom(UPLOAD_ID_BYTES))
self.id = rand_b64.decode('utf-8').replace('=', '').replace('+', '')
def complete(self):
def complete(self, body):
decode_hex = codecs.getdecoder("hex_codec")
total = bytearray()
md5s = bytearray()
last_part_name = len(self.list_parts())
for part in self.list_parts():
if part.name != last_part_name and len(part.value) < UPLOAD_PART_MIN_SIZE:
return None, None
last = None
count = 0
for pn, etag in body:
part = self.parts.get(pn)
if part is None or part.etag != etag:
raise InvalidPart()
if last is not None and len(last.value) < UPLOAD_PART_MIN_SIZE:
raise EntityTooSmall()
part_etag = part.etag.replace('"', '')
md5s.extend(decode_hex(part_etag)[0])
total.extend(part.value)
last = part
count += 1
etag = hashlib.md5()
etag.update(bytes(md5s))
return total, "{0}-{1}".format(etag.hexdigest(), last_part_name)
return total, "{0}-{1}".format(etag.hexdigest(), count)
def set_part(self, part_id, value):
if part_id < 1:
@ -144,18 +152,12 @@ class FakeMultipart(object):
key = FakeKey(part_id, value)
self.parts[part_id] = key
insort(self.partlist, part_id)
return key
def list_parts(self):
parts = []
for part_id, index in enumerate(sorted(self.parts.keys()), start=1):
# Make sure part ids are continuous
if part_id != index:
return
parts.append(self.parts[part_id])
return parts
for part_id in self.partlist:
yield self.parts[part_id]
class FakeBucket(object):
@ -191,7 +193,7 @@ class S3Backend(BaseBackend):
def create_bucket(self, bucket_name, region_name):
if bucket_name in self.buckets:
raise BucketAlreadyExists()
raise BucketAlreadyExists(bucket=bucket_name)
new_bucket = FakeBucket(name=bucket_name, region_name=region_name)
self.buckets[bucket_name] = new_bucket
return new_bucket
@ -203,7 +205,7 @@ class S3Backend(BaseBackend):
try:
return self.buckets[bucket_name]
except KeyError:
raise MissingBucket()
raise MissingBucket(bucket=bucket_name)
def delete_bucket(self, bucket_name):
bucket = self.get_bucket(bucket_name)
@ -279,10 +281,10 @@ class S3Backend(BaseBackend):
return new_multipart
def complete_multipart(self, bucket_name, multipart_id):
def complete_multipart(self, bucket_name, multipart_id, body):
bucket = self.get_bucket(bucket_name)
multipart = bucket.multiparts[multipart_id]
value, etag = multipart.complete()
value, etag = multipart.complete(body)
if value is None:
return
del bucket.multiparts[multipart_id]
@ -297,7 +299,7 @@ class S3Backend(BaseBackend):
def list_multipart(self, bucket_name, multipart_id):
bucket = self.get_bucket(bucket_name)
return bucket.multiparts[multipart_id].list_parts()
return list(bucket.multiparts[multipart_id].list_parts())
def get_all_multiparts(self, bucket_name):
bucket = self.get_bucket(bucket_name)

View File

@ -7,7 +7,7 @@ from six.moves.urllib.parse import parse_qs, urlparse
from moto.core.responses import _TemplateEnvironmentMixin
from .exceptions import BucketAlreadyExists, MissingBucket
from .exceptions import BucketAlreadyExists, S3ClientError, InvalidPartOrder
from .models import s3_backend
from .utils import bucket_name_from_url, metadata_from_headers
from xml.dom import minidom
@ -35,8 +35,8 @@ class ResponseObject(_TemplateEnvironmentMixin):
def bucket_response(self, request, full_url, headers):
try:
response = self._bucket_response(request, full_url, headers)
except MissingBucket:
return 404, headers, ""
except S3ClientError as s3error:
response = s3error.code, headers, s3error.description
if isinstance(response, six.string_types):
return 200, headers, response.encode("utf-8")
@ -72,12 +72,8 @@ class ResponseObject(_TemplateEnvironmentMixin):
raise NotImplementedError("Method {0} has not been impelemented in the S3 backend yet".format(method))
def _bucket_response_head(self, bucket_name, headers):
try:
self.backend.get_bucket(bucket_name)
except MissingBucket:
return 404, headers, ""
else:
return 200, headers, ""
self.backend.get_bucket(bucket_name)
return 200, headers, ""
def _bucket_response_get(self, bucket_name, querystring, headers):
if 'uploads' in querystring:
@ -127,11 +123,7 @@ class ResponseObject(_TemplateEnvironmentMixin):
is_truncated='false',
)
try:
bucket = self.backend.get_bucket(bucket_name)
except MissingBucket:
return 404, headers, ""
bucket = self.backend.get_bucket(bucket_name)
prefix = querystring.get('prefix', [None])[0]
delimiter = querystring.get('delimiter', [None])[0]
result_keys, result_folders = self.backend.prefix_query(bucket, prefix, delimiter)
@ -161,17 +153,12 @@ class ResponseObject(_TemplateEnvironmentMixin):
# us-east-1 has different behavior
new_bucket = self.backend.get_bucket(bucket_name)
else:
return 409, headers, ""
raise
template = self.response_template(S3_BUCKET_CREATE_RESPONSE)
return 200, headers, template.render(bucket=new_bucket)
def _bucket_response_delete(self, bucket_name, headers):
try:
removed_bucket = self.backend.delete_bucket(bucket_name)
except MissingBucket:
# Non-existant bucket
template = self.response_template(S3_DELETE_NON_EXISTING_BUCKET)
return 404, headers, template.render(bucket_name=bucket_name)
removed_bucket = self.backend.delete_bucket(bucket_name)
if removed_bucket:
# Bucket exists
@ -231,8 +218,8 @@ class ResponseObject(_TemplateEnvironmentMixin):
def key_response(self, request, full_url, headers):
try:
response = self._key_response(request, full_url, headers)
except MissingBucket:
return 404, headers, ""
except S3ClientError as s3error:
response = s3error.code, headers, s3error.description
if isinstance(response, six.string_types):
return 200, headers, response
@ -364,6 +351,15 @@ class ResponseObject(_TemplateEnvironmentMixin):
template = self.response_template(S3_DELETE_OBJECT_SUCCESS)
return 204, headers, template.render(bucket=removed_key)
def _complete_multipart_body(self, body):
ps = minidom.parseString(body).getElementsByTagName('Part')
prev = 0
for p in ps:
pn = int(p.getElementsByTagName('PartNumber')[0].firstChild.wholeText)
if pn <= prev:
raise InvalidPartOrder()
yield (pn, p.getElementsByTagName('ETag')[0].firstChild.wholeText)
def _key_response_post(self, request, body, parsed_url, bucket_name, query, key_name, headers):
if body == b'' and parsed_url.query == 'uploads':
metadata = metadata_from_headers(request.headers)
@ -378,18 +374,15 @@ class ResponseObject(_TemplateEnvironmentMixin):
return 200, headers, response
if 'uploadId' in query:
body = self._complete_multipart_body(body)
upload_id = query['uploadId'][0]
key = self.backend.complete_multipart(bucket_name, upload_id)
if key is not None:
template = self.response_template(S3_MULTIPART_COMPLETE_RESPONSE)
return template.render(
bucket_name=bucket_name,
key_name=key.name,
etag=key.etag,
)
template = self.response_template(S3_MULTIPART_COMPLETE_TOO_SMALL_ERROR)
return 400, headers, template.render()
key = self.backend.complete_multipart(bucket_name, upload_id, body)
template = self.response_template(S3_MULTIPART_COMPLETE_RESPONSE)
return template.render(
bucket_name=bucket_name,
key_name=key.name,
etag=key.etag,
)
elif parsed_url.query == 'restore':
es = minidom.parseString(body).getElementsByTagName('Days')
days = es[0].childNodes[0].wholeText
@ -461,14 +454,6 @@ S3_DELETE_BUCKET_SUCCESS = """<DeleteBucketResponse xmlns="http://s3.amazonaws.c
</DeleteBucketResponse>
</DeleteBucketResponse>"""
S3_DELETE_NON_EXISTING_BUCKET = """<?xml version="1.0" encoding="UTF-8"?>
<Error><Code>NoSuchBucket</Code>
<Message>The specified bucket does not exist</Message>
<BucketName>{{ bucket_name }}</BucketName>
<RequestId>asdfasdfsadf</RequestId>
<HostId>asfasdfsfsafasdf</HostId>
</Error>"""
S3_DELETE_BUCKET_WITH_ITEMS_ERROR = """<?xml version="1.0" encoding="UTF-8"?>
<Error><Code>BucketNotEmpty</Code>
<Message>The bucket you tried to delete is not empty</Message>
@ -609,14 +594,6 @@ S3_MULTIPART_COMPLETE_RESPONSE = """<?xml version="1.0" encoding="UTF-8"?>
</CompleteMultipartUploadResult>
"""
S3_MULTIPART_COMPLETE_TOO_SMALL_ERROR = """<?xml version="1.0" encoding="UTF-8"?>
<Error>
<Code>EntityTooSmall</Code>
<Message>Your proposed upload is smaller than the minimum allowed object size.</Message>
<RequestId>asdfasdfsdafds</RequestId>
<HostId>sdfgdsfgdsfgdfsdsfgdfs</HostId>
</Error>"""
S3_ALL_MULTIPARTS = """<?xml version="1.0" encoding="UTF-8"?>
<ListMultipartUploadsResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
<Bucket>{{ bucket_name }}</Bucket>

View File

@ -19,6 +19,25 @@ import sure # noqa
from moto import mock_s3
REDUCED_PART_SIZE = 256
def reduced_min_part_size(f):
""" speed up tests by temporarily making the multipart minimum part size
small
"""
import moto.s3.models as s3model
orig_size = s3model.UPLOAD_PART_MIN_SIZE
def wrapped(*args, **kwargs):
try:
s3model.UPLOAD_PART_MIN_SIZE = REDUCED_PART_SIZE
return f(*args, **kwargs)
finally:
s3model.UPLOAD_PART_MIN_SIZE = orig_size
return wrapped
class MyModel(object):
def __init__(self, name, value):
self.name = name
@ -72,12 +91,13 @@ def test_multipart_upload_too_small():
@mock_s3
@reduced_min_part_size
def test_multipart_upload():
conn = boto.connect_s3('the_key', 'the_secret')
bucket = conn.create_bucket("foobar")
multipart = bucket.initiate_multipart_upload("the-key")
part1 = b'0' * 5242880
part1 = b'0' * REDUCED_PART_SIZE
multipart.upload_part_from_file(BytesIO(part1), 1)
# last part, can be less than 5 MB
part2 = b'1'
@ -88,6 +108,24 @@ def test_multipart_upload():
@mock_s3
@reduced_min_part_size
def test_multipart_upload_out_of_order():
conn = boto.connect_s3('the_key', 'the_secret')
bucket = conn.create_bucket("foobar")
multipart = bucket.initiate_multipart_upload("the-key")
# last part, can be less than 5 MB
part2 = b'1'
multipart.upload_part_from_file(BytesIO(part2), 4)
part1 = b'0' * REDUCED_PART_SIZE
multipart.upload_part_from_file(BytesIO(part1), 2)
multipart.complete_upload()
# we should get both parts as the key contents
bucket.get_key("the-key").get_contents_as_string().should.equal(part1 + part2)
@mock_s3
@reduced_min_part_size
def test_multipart_upload_with_headers():
conn = boto.connect_s3('the_key', 'the_secret')
bucket = conn.create_bucket("foobar")
@ -102,6 +140,7 @@ def test_multipart_upload_with_headers():
@mock_s3
@reduced_min_part_size
def test_multipart_upload_with_copy_key():
conn = boto.connect_s3('the_key', 'the_secret')
bucket = conn.create_bucket("foobar")
@ -110,7 +149,7 @@ def test_multipart_upload_with_copy_key():
key.set_contents_from_string("key_value")
multipart = bucket.initiate_multipart_upload("the-key")
part1 = b'0' * 5242880
part1 = b'0' * REDUCED_PART_SIZE
multipart.upload_part_from_file(BytesIO(part1), 1)
multipart.copy_part_from_key("foobar", "original-key", 2)
multipart.complete_upload()
@ -118,12 +157,13 @@ def test_multipart_upload_with_copy_key():
@mock_s3
@reduced_min_part_size
def test_multipart_upload_cancel():
conn = boto.connect_s3('the_key', 'the_secret')
bucket = conn.create_bucket("foobar")
multipart = bucket.initiate_multipart_upload("the-key")
part1 = b'0' * 5242880
part1 = b'0' * REDUCED_PART_SIZE
multipart.upload_part_from_file(BytesIO(part1), 1)
multipart.cancel_upload()
# TODO we really need some sort of assertion here, but we don't currently
@ -131,13 +171,14 @@ def test_multipart_upload_cancel():
@mock_s3
@reduced_min_part_size
def test_multipart_etag():
# Create Bucket so that test can run
conn = boto.connect_s3('the_key', 'the_secret')
bucket = conn.create_bucket('mybucket')
multipart = bucket.initiate_multipart_upload("the-key")
part1 = b'0' * 5242880
part1 = b'0' * REDUCED_PART_SIZE
multipart.upload_part_from_file(BytesIO(part1), 1)
# last part, can be less than 5 MB
part2 = b'1'
@ -148,6 +189,26 @@ def test_multipart_etag():
'"140f92a6df9f9e415f74a1463bcee9bb-2"')
@mock_s3
@reduced_min_part_size
def test_multipart_invalid_order():
# Create Bucket so that test can run
conn = boto.connect_s3('the_key', 'the_secret')
bucket = conn.create_bucket('mybucket')
multipart = bucket.initiate_multipart_upload("the-key")
part1 = b'0' * 5242880
etag1 = multipart.upload_part_from_file(BytesIO(part1), 1).etag
# last part, can be less than 5 MB
part2 = b'1'
etag2 = multipart.upload_part_from_file(BytesIO(part2), 2).etag
xml = "<Part><PartNumber>{0}</PartNumber><ETag>{1}</ETag></Part>"
xml = xml.format(2, etag2) + xml.format(1, etag1)
xml = "<CompleteMultipartUpload>{0}</CompleteMultipartUpload>".format(xml)
bucket.complete_multipart_upload.when.called_with(
multipart.key_name, multipart.id, xml).should.throw(S3ResponseError)
@mock_s3
def test_list_multiparts():
# Create Bucket so that test can run