Added tests and made current tests pass
This commit is contained in:
parent
d8e9301c54
commit
7de4399b93
@ -70,18 +70,28 @@ def _bucket_response(request, full_url, headers):
|
|||||||
return 409, headers, template.render(bucket=removed_bucket)
|
return 409, headers, template.render(bucket=removed_bucket)
|
||||||
elif method == 'POST':
|
elif method == 'POST':
|
||||||
#POST to bucket-url should create file from form
|
#POST to bucket-url should create file from form
|
||||||
key = request.form['key']
|
if hasattr(request, 'form'):
|
||||||
f = request.form['file']
|
#Not HTTPretty
|
||||||
new_key = s3_backend.set_key(bucket_name, key, "")
|
form = request.form
|
||||||
#TODO Set actual file
|
else:
|
||||||
|
#HTTPretty, build new form object
|
||||||
|
form = {}
|
||||||
|
for kv in request.body.split('&'):
|
||||||
|
k, v = kv.split('=')
|
||||||
|
form[k] = v
|
||||||
|
|
||||||
|
key = form['key']
|
||||||
|
f = form['file']
|
||||||
|
|
||||||
|
new_key = s3_backend.set_key(bucket_name, key, f)
|
||||||
|
|
||||||
#Metadata
|
#Metadata
|
||||||
meta_regex = re.compile('^x-amz-meta-([a-zA-Z0-9\-_]+)$', flags=re.IGNORECASE)
|
meta_regex = re.compile('^x-amz-meta-([a-zA-Z0-9\-_]+)$', flags=re.IGNORECASE)
|
||||||
for form_id in request.form:
|
for form_id in form:
|
||||||
result = meta_regex.match(form_id)
|
result = meta_regex.match(form_id)
|
||||||
if result:
|
if result:
|
||||||
meta_key = result.group(0).lower()
|
meta_key = result.group(0).lower()
|
||||||
metadata = request.form[form_id]
|
metadata = form[form_id]
|
||||||
new_key.set_metadata(meta_key, metadata)
|
new_key.set_metadata(meta_key, metadata)
|
||||||
return 200, headers, ""
|
return 200, headers, ""
|
||||||
else:
|
else:
|
||||||
@ -101,8 +111,8 @@ def _key_response(request, full_url, headers):
|
|||||||
parsed_url = urlparse(full_url)
|
parsed_url = urlparse(full_url)
|
||||||
method = request.method
|
method = request.method
|
||||||
|
|
||||||
|
key_name = parsed_url.path.lstrip('/')
|
||||||
bucket_name = bucket_name_from_url(full_url)
|
bucket_name = bucket_name_from_url(full_url)
|
||||||
key_name = parsed_url.path.split(bucket_name + '/')[-1]
|
|
||||||
if hasattr(request, 'body'):
|
if hasattr(request, 'body'):
|
||||||
# Boto
|
# Boto
|
||||||
body = request.body
|
body = request.body
|
||||||
@ -140,10 +150,11 @@ def _key_response(request, full_url, headers):
|
|||||||
#Metadata
|
#Metadata
|
||||||
meta_regex = re.compile('^x-amz-meta-([a-zA-Z0-9\-_]+)$', flags=re.IGNORECASE)
|
meta_regex = re.compile('^x-amz-meta-([a-zA-Z0-9\-_]+)$', flags=re.IGNORECASE)
|
||||||
for header in request.headers:
|
for header in request.headers:
|
||||||
result = meta_regex.match(header[0])
|
if isinstance(header, basestring):
|
||||||
|
result = meta_regex.match(header)
|
||||||
if result:
|
if result:
|
||||||
meta_key = result.group(0).lower()
|
meta_key = result.group(0).lower()
|
||||||
metadata = header[1]
|
metadata = request.headers[header]
|
||||||
new_key.set_metadata(meta_key, metadata)
|
new_key.set_metadata(meta_key, metadata)
|
||||||
template = Template(S3_OBJECT_RESPONSE)
|
template = Template(S3_OBJECT_RESPONSE)
|
||||||
headers.update(new_key.response_dict)
|
headers.update(new_key.response_dict)
|
||||||
|
@ -102,6 +102,16 @@ def test_copy_key():
|
|||||||
bucket.get_key("the-key").get_contents_as_string().should.equal("some value")
|
bucket.get_key("the-key").get_contents_as_string().should.equal("some value")
|
||||||
bucket.get_key("new-key").get_contents_as_string().should.equal("some value")
|
bucket.get_key("new-key").get_contents_as_string().should.equal("some value")
|
||||||
|
|
||||||
|
@mock_s3
|
||||||
|
def test_set_metadata():
|
||||||
|
conn = boto.connect_s3('the_key', 'the_secret')
|
||||||
|
bucket = conn.create_bucket("foobar")
|
||||||
|
key = Key(bucket)
|
||||||
|
key.key = 'the-key'
|
||||||
|
key.set_metadata('md', 'Metadatastring')
|
||||||
|
key.set_contents_from_string("Testval")
|
||||||
|
|
||||||
|
bucket.get_key('the-key').get_metadata('md').should.equal('Metadatastring')
|
||||||
|
|
||||||
@freeze_time("2012-01-01 12:00:00")
|
@freeze_time("2012-01-01 12:00:00")
|
||||||
@mock_s3
|
@mock_s3
|
||||||
@ -163,9 +173,34 @@ def test_get_all_buckets():
|
|||||||
buckets.should.have.length_of(2)
|
buckets.should.have.length_of(2)
|
||||||
|
|
||||||
|
|
||||||
|
@mock_s3
|
||||||
|
def test_post_to_bucket():
|
||||||
|
conn = boto.connect_s3('the_key', 'the_secret')
|
||||||
|
bucket = conn.create_bucket("foobar")
|
||||||
|
|
||||||
|
requests.post("https://foobar.s3.amazonaws.com/", {
|
||||||
|
'key': 'the-key',
|
||||||
|
'file': 'nothing'
|
||||||
|
})
|
||||||
|
|
||||||
|
bucket.get_key('the-key').get_contents_as_string().should.equal('nothing')
|
||||||
|
|
||||||
|
@mock_s3
|
||||||
|
def test_post_with_metadata_to_bucket():
|
||||||
|
conn = boto.connect_s3('the_key', 'the_secret')
|
||||||
|
bucket = conn.create_bucket("foobar")
|
||||||
|
|
||||||
|
requests.post("https://foobar.s3.amazonaws.com/", {
|
||||||
|
'key': 'the-key',
|
||||||
|
'file': 'nothing',
|
||||||
|
'x-amz-meta-test': 'metadata'
|
||||||
|
})
|
||||||
|
|
||||||
|
bucket.get_key('the-key').get_metadata('test').should.equal('metadata')
|
||||||
|
|
||||||
@mock_s3
|
@mock_s3
|
||||||
def test_bucket_method_not_implemented():
|
def test_bucket_method_not_implemented():
|
||||||
requests.post.when.called_with("https://foobar.s3.amazonaws.com/").should.throw(NotImplementedError)
|
requests.patch.when.called_with("https://foobar.s3.amazonaws.com/").should.throw(NotImplementedError)
|
||||||
|
|
||||||
|
|
||||||
@mock_s3
|
@mock_s3
|
||||||
|
Loading…
Reference in New Issue
Block a user