commit
b6eb4e4a7d
@ -24,6 +24,7 @@ class FakeKey(object):
|
|||||||
self.name = name
|
self.name = name
|
||||||
self.value = value
|
self.value = value
|
||||||
self.last_modified = datetime.datetime.utcnow()
|
self.last_modified = datetime.datetime.utcnow()
|
||||||
|
self.acl = get_canned_acl('private')
|
||||||
self._storage_class = storage
|
self._storage_class = storage
|
||||||
self._metadata = {}
|
self._metadata = {}
|
||||||
self._expiry = None
|
self._expiry = None
|
||||||
@ -45,6 +46,9 @@ class FakeKey(object):
|
|||||||
def set_storage_class(self, storage_class):
|
def set_storage_class(self, storage_class):
|
||||||
self._storage_class = storage_class
|
self._storage_class = storage_class
|
||||||
|
|
||||||
|
def set_acl(self, acl):
|
||||||
|
self.acl = acl
|
||||||
|
|
||||||
def append_to_value(self, value):
|
def append_to_value(self, value):
|
||||||
self.value += value
|
self.value += value
|
||||||
self.last_modified = datetime.datetime.utcnow()
|
self.last_modified = datetime.datetime.utcnow()
|
||||||
@ -161,6 +165,61 @@ class FakeMultipart(object):
|
|||||||
yield self.parts[part_id]
|
yield self.parts[part_id]
|
||||||
|
|
||||||
|
|
||||||
|
class FakeGrantee(object):
|
||||||
|
def __init__(self, id='', uri='', display_name=''):
|
||||||
|
self.id = id
|
||||||
|
self.uri = uri
|
||||||
|
self.display_name = display_name
|
||||||
|
|
||||||
|
@property
|
||||||
|
def type(self):
|
||||||
|
return 'Group' if self.uri else 'CanonicalUser'
|
||||||
|
|
||||||
|
|
||||||
|
ALL_USERS_GRANTEE = FakeGrantee(uri='http://acs.amazonaws.com/groups/global/AllUsers')
|
||||||
|
AUTHENTICATED_USERS_GRANTEE = FakeGrantee(uri='http://acs.amazonaws.com/groups/global/AuthenticatedUsers')
|
||||||
|
LOG_DELIVERY_GRANTEE = FakeGrantee(uri='http://acs.amazonaws.com/groups/s3/LogDelivery')
|
||||||
|
|
||||||
|
PERMISSION_FULL_CONTROL = 'FULL_CONTROL'
|
||||||
|
PERMISSION_WRITE = 'WRITE'
|
||||||
|
PERMISSION_READ = 'READ'
|
||||||
|
PERMISSION_WRITE_ACP = 'WRITE_ACP'
|
||||||
|
PERMISSION_READ_ACP = 'READ_ACP'
|
||||||
|
|
||||||
|
|
||||||
|
class FakeGrant(object):
|
||||||
|
def __init__(self, grantees, permissions):
|
||||||
|
self.grantees = grantees
|
||||||
|
self.permissions = permissions
|
||||||
|
|
||||||
|
|
||||||
|
class FakeAcl(object):
|
||||||
|
def __init__(self, grants=[]):
|
||||||
|
self.grants = grants
|
||||||
|
|
||||||
|
|
||||||
|
def get_canned_acl(acl):
|
||||||
|
owner_grantee = FakeGrantee(id='75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a')
|
||||||
|
grants = [FakeGrant([owner_grantee], [PERMISSION_FULL_CONTROL])]
|
||||||
|
if acl == 'private':
|
||||||
|
pass # no other permissions
|
||||||
|
elif acl == 'public-read':
|
||||||
|
grants.append(FakeGrant([ALL_USERS_GRANTEE], [PERMISSION_READ]))
|
||||||
|
elif acl == 'public-read-write':
|
||||||
|
grants.append(FakeGrant([ALL_USERS_GRANTEE], [PERMISSION_READ, PERMISSION_WRITE]))
|
||||||
|
elif acl == 'authenticated-read':
|
||||||
|
grants.append(FakeGrant([AUTHENTICATED_USERS_GRANTEE], [PERMISSION_READ]))
|
||||||
|
elif acl == 'bucket-owner-read':
|
||||||
|
pass # TODO: bucket owner ACL
|
||||||
|
elif acl == 'bucket-owner-full-control':
|
||||||
|
pass # TODO: bucket owner ACL
|
||||||
|
elif acl == 'log-delivery-write':
|
||||||
|
grants.append(FakeGrant([LOG_DELIVERY_GRANTEE], [PERMISSION_READ_ACP, PERMISSION_WRITE]))
|
||||||
|
else:
|
||||||
|
assert False, 'Unknown canned acl: %s' % (acl,)
|
||||||
|
return FakeAcl(grants=grants)
|
||||||
|
|
||||||
|
|
||||||
class LifecycleRule(object):
|
class LifecycleRule(object):
|
||||||
def __init__(self, id=None, prefix=None, status=None, expiration_days=None,
|
def __init__(self, id=None, prefix=None, status=None, expiration_days=None,
|
||||||
expiration_date=None, transition_days=None,
|
expiration_date=None, transition_days=None,
|
||||||
@ -399,7 +458,7 @@ class S3Backend(BaseBackend):
|
|||||||
bucket = self.get_bucket(bucket_name)
|
bucket = self.get_bucket(bucket_name)
|
||||||
return bucket.keys.pop(key_name)
|
return bucket.keys.pop(key_name)
|
||||||
|
|
||||||
def copy_key(self, src_bucket_name, src_key_name, dest_bucket_name, dest_key_name, storage=None):
|
def copy_key(self, src_bucket_name, src_key_name, dest_bucket_name, dest_key_name, storage=None, acl=None):
|
||||||
src_key_name = clean_key_name(src_key_name)
|
src_key_name = clean_key_name(src_key_name)
|
||||||
dest_key_name = clean_key_name(dest_key_name)
|
dest_key_name = clean_key_name(dest_key_name)
|
||||||
src_bucket = self.get_bucket(src_bucket_name)
|
src_bucket = self.get_bucket(src_bucket_name)
|
||||||
@ -409,6 +468,8 @@ class S3Backend(BaseBackend):
|
|||||||
key = key.copy(dest_key_name)
|
key = key.copy(dest_key_name)
|
||||||
dest_bucket.keys[dest_key_name] = key
|
dest_bucket.keys[dest_key_name] = key
|
||||||
if storage is not None:
|
if storage is not None:
|
||||||
dest_bucket.keys[dest_key_name].set_storage_class(storage)
|
key.set_storage_class(storage)
|
||||||
|
if acl is not None:
|
||||||
|
key.set_acl(acl)
|
||||||
|
|
||||||
s3_backend = S3Backend()
|
s3_backend = S3Backend()
|
||||||
|
@ -9,7 +9,7 @@ import xmltodict
|
|||||||
from moto.core.responses import _TemplateEnvironmentMixin
|
from moto.core.responses import _TemplateEnvironmentMixin
|
||||||
|
|
||||||
from .exceptions import BucketAlreadyExists, S3ClientError, InvalidPartOrder
|
from .exceptions import BucketAlreadyExists, S3ClientError, InvalidPartOrder
|
||||||
from .models import s3_backend
|
from .models import s3_backend, get_canned_acl, FakeGrantee, FakeGrant, FakeAcl
|
||||||
from .utils import bucket_name_from_url, metadata_from_headers
|
from .utils import bucket_name_from_url, metadata_from_headers
|
||||||
from xml.dom import minidom
|
from xml.dom import minidom
|
||||||
|
|
||||||
@ -301,7 +301,7 @@ class ResponseObject(_TemplateEnvironmentMixin):
|
|||||||
|
|
||||||
def _key_response(self, request, full_url, headers):
|
def _key_response(self, request, full_url, headers):
|
||||||
parsed_url = urlparse(full_url)
|
parsed_url = urlparse(full_url)
|
||||||
query = parse_qs(parsed_url.query)
|
query = parse_qs(parsed_url.query, keep_blank_values=True)
|
||||||
method = request.method
|
method = request.method
|
||||||
|
|
||||||
key_name = self.parse_key_name(parsed_url.path)
|
key_name = self.parse_key_name(parsed_url.path)
|
||||||
@ -317,18 +317,18 @@ class ResponseObject(_TemplateEnvironmentMixin):
|
|||||||
if method == 'GET':
|
if method == 'GET':
|
||||||
return self._key_response_get(bucket_name, query, key_name, headers)
|
return self._key_response_get(bucket_name, query, key_name, headers)
|
||||||
elif method == 'PUT':
|
elif method == 'PUT':
|
||||||
return self._key_response_put(request, parsed_url, body, bucket_name, query, key_name, headers)
|
return self._key_response_put(request, body, bucket_name, query, key_name, headers)
|
||||||
elif method == 'HEAD':
|
elif method == 'HEAD':
|
||||||
return self._key_response_head(bucket_name, key_name, headers)
|
return self._key_response_head(bucket_name, key_name, headers)
|
||||||
elif method == 'DELETE':
|
elif method == 'DELETE':
|
||||||
return self._key_response_delete(bucket_name, query, key_name, headers)
|
return self._key_response_delete(bucket_name, query, key_name, headers)
|
||||||
elif method == 'POST':
|
elif method == 'POST':
|
||||||
return self._key_response_post(request, body, parsed_url, bucket_name, query, key_name, headers)
|
return self._key_response_post(request, body, bucket_name, query, key_name, headers)
|
||||||
else:
|
else:
|
||||||
raise NotImplementedError("Method {0} has not been impelemented in the S3 backend yet".format(method))
|
raise NotImplementedError("Method {0} has not been impelemented in the S3 backend yet".format(method))
|
||||||
|
|
||||||
def _key_response_get(self, bucket_name, query, key_name, headers):
|
def _key_response_get(self, bucket_name, query, key_name, headers):
|
||||||
if 'uploadId' in query:
|
if query.get('uploadId'):
|
||||||
upload_id = query['uploadId'][0]
|
upload_id = query['uploadId'][0]
|
||||||
parts = self.backend.list_multipart(bucket_name, upload_id)
|
parts = self.backend.list_multipart(bucket_name, upload_id)
|
||||||
template = self.response_template(S3_MULTIPART_LIST_RESPONSE)
|
template = self.response_template(S3_MULTIPART_LIST_RESPONSE)
|
||||||
@ -342,14 +342,18 @@ class ResponseObject(_TemplateEnvironmentMixin):
|
|||||||
version_id = query.get('versionId', [None])[0]
|
version_id = query.get('versionId', [None])[0]
|
||||||
key = self.backend.get_key(
|
key = self.backend.get_key(
|
||||||
bucket_name, key_name, version_id=version_id)
|
bucket_name, key_name, version_id=version_id)
|
||||||
|
if 'acl' in query:
|
||||||
|
template = self.response_template(S3_OBJECT_ACL_RESPONSE)
|
||||||
|
return 200, headers, template.render(key=key)
|
||||||
|
|
||||||
if key:
|
if key:
|
||||||
headers.update(key.metadata)
|
headers.update(key.metadata)
|
||||||
return 200, headers, key.value
|
return 200, headers, key.value
|
||||||
else:
|
else:
|
||||||
return 404, headers, ""
|
return 404, headers, ""
|
||||||
|
|
||||||
def _key_response_put(self, request, parsed_url, body, bucket_name, query, key_name, headers):
|
def _key_response_put(self, request, body, bucket_name, query, key_name, headers):
|
||||||
if 'uploadId' in query and 'partNumber' in query:
|
if query.get('uploadId') and query.get('partNumber'):
|
||||||
upload_id = query['uploadId'][0]
|
upload_id = query['uploadId'][0]
|
||||||
part_number = int(query['partNumber'][0])
|
part_number = int(query['partNumber'][0])
|
||||||
if 'x-amz-copy-source' in request.headers:
|
if 'x-amz-copy-source' in request.headers:
|
||||||
@ -368,16 +372,19 @@ class ResponseObject(_TemplateEnvironmentMixin):
|
|||||||
return 200, headers, response
|
return 200, headers, response
|
||||||
|
|
||||||
storage_class = request.headers.get('x-amz-storage-class', 'STANDARD')
|
storage_class = request.headers.get('x-amz-storage-class', 'STANDARD')
|
||||||
|
acl = self._acl_from_headers(request.headers)
|
||||||
|
|
||||||
if parsed_url.query == 'acl':
|
if 'acl' in query:
|
||||||
# We don't implement ACL yet, so just return
|
key = self.backend.get_key(bucket_name, key_name)
|
||||||
|
# TODO: Support the XML-based ACL format
|
||||||
|
key.set_acl(acl)
|
||||||
return 200, headers, ""
|
return 200, headers, ""
|
||||||
|
|
||||||
if 'x-amz-copy-source' in request.headers:
|
if 'x-amz-copy-source' in request.headers:
|
||||||
# Copy key
|
# Copy key
|
||||||
src_bucket, src_key = request.headers.get("x-amz-copy-source").split("/", 1)
|
src_bucket, src_key = request.headers.get("x-amz-copy-source").split("/", 1)
|
||||||
self.backend.copy_key(src_bucket, src_key, bucket_name, key_name,
|
self.backend.copy_key(src_bucket, src_key, bucket_name, key_name,
|
||||||
storage=storage_class)
|
storage=storage_class, acl=acl)
|
||||||
mdirective = request.headers.get('x-amz-metadata-directive')
|
mdirective = request.headers.get('x-amz-metadata-directive')
|
||||||
if mdirective is not None and mdirective == 'REPLACE':
|
if mdirective is not None and mdirective == 'REPLACE':
|
||||||
new_key = self.backend.get_key(bucket_name, key_name)
|
new_key = self.backend.get_key(bucket_name, key_name)
|
||||||
@ -400,6 +407,7 @@ class ResponseObject(_TemplateEnvironmentMixin):
|
|||||||
request.streaming = True
|
request.streaming = True
|
||||||
metadata = metadata_from_headers(request.headers)
|
metadata = metadata_from_headers(request.headers)
|
||||||
new_key.set_metadata(metadata)
|
new_key.set_metadata(metadata)
|
||||||
|
new_key.set_acl(acl)
|
||||||
|
|
||||||
template = self.response_template(S3_OBJECT_RESPONSE)
|
template = self.response_template(S3_OBJECT_RESPONSE)
|
||||||
headers.update(new_key.response_dict)
|
headers.update(new_key.response_dict)
|
||||||
@ -414,8 +422,40 @@ class ResponseObject(_TemplateEnvironmentMixin):
|
|||||||
else:
|
else:
|
||||||
return 404, headers, ""
|
return 404, headers, ""
|
||||||
|
|
||||||
|
def _acl_from_headers(self, headers):
|
||||||
|
canned_acl = headers.get('x-amz-acl', '')
|
||||||
|
if canned_acl:
|
||||||
|
return get_canned_acl(canned_acl)
|
||||||
|
|
||||||
|
grants = []
|
||||||
|
for header, value in headers.items():
|
||||||
|
if not header.startswith('x-amz-grant-'):
|
||||||
|
continue
|
||||||
|
|
||||||
|
permission = {
|
||||||
|
'read': 'READ',
|
||||||
|
'write': 'WRITE',
|
||||||
|
'read-acp': 'READ_ACP',
|
||||||
|
'write-acp': 'WRITE_ACP',
|
||||||
|
'full-control': 'FULL_CONTROL',
|
||||||
|
}[header[len('x-amz-grant-'):]]
|
||||||
|
|
||||||
|
grantees = []
|
||||||
|
for key_and_value in value.split(","):
|
||||||
|
key, value = re.match('([^=]+)="([^"]+)"', key_and_value.strip()).groups()
|
||||||
|
if key.lower() == 'id':
|
||||||
|
grantees.append(FakeGrantee(id=value))
|
||||||
|
else:
|
||||||
|
grantees.append(FakeGrantee(uri=value))
|
||||||
|
grants.append(FakeGrant(grantees, [permission]))
|
||||||
|
|
||||||
|
if grants:
|
||||||
|
return FakeAcl(grants)
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
|
||||||
def _key_response_delete(self, bucket_name, query, key_name, headers):
|
def _key_response_delete(self, bucket_name, query, key_name, headers):
|
||||||
if 'uploadId' in query:
|
if query.get('uploadId'):
|
||||||
upload_id = query['uploadId'][0]
|
upload_id = query['uploadId'][0]
|
||||||
self.backend.cancel_multipart(bucket_name, upload_id)
|
self.backend.cancel_multipart(bucket_name, upload_id)
|
||||||
return 204, headers, ""
|
return 204, headers, ""
|
||||||
@ -435,8 +475,8 @@ class ResponseObject(_TemplateEnvironmentMixin):
|
|||||||
raise InvalidPartOrder()
|
raise InvalidPartOrder()
|
||||||
yield (pn, p.getElementsByTagName('ETag')[0].firstChild.wholeText)
|
yield (pn, p.getElementsByTagName('ETag')[0].firstChild.wholeText)
|
||||||
|
|
||||||
def _key_response_post(self, request, body, parsed_url, bucket_name, query, key_name, headers):
|
def _key_response_post(self, request, body, bucket_name, query, key_name, headers):
|
||||||
if body == b'' and parsed_url.query == 'uploads':
|
if body == b'' and 'uploads' in query:
|
||||||
metadata = metadata_from_headers(request.headers)
|
metadata = metadata_from_headers(request.headers)
|
||||||
multipart = self.backend.initiate_multipart(bucket_name, key_name, metadata)
|
multipart = self.backend.initiate_multipart(bucket_name, key_name, metadata)
|
||||||
|
|
||||||
@ -448,7 +488,7 @@ class ResponseObject(_TemplateEnvironmentMixin):
|
|||||||
)
|
)
|
||||||
return 200, headers, response
|
return 200, headers, response
|
||||||
|
|
||||||
if 'uploadId' in query:
|
if query.get('uploadId'):
|
||||||
body = self._complete_multipart_body(body)
|
body = self._complete_multipart_body(body)
|
||||||
upload_id = query['uploadId'][0]
|
upload_id = query['uploadId'][0]
|
||||||
key = self.backend.complete_multipart(bucket_name, upload_id, body)
|
key = self.backend.complete_multipart(bucket_name, upload_id, body)
|
||||||
@ -458,7 +498,7 @@ class ResponseObject(_TemplateEnvironmentMixin):
|
|||||||
key_name=key.name,
|
key_name=key.name,
|
||||||
etag=key.etag,
|
etag=key.etag,
|
||||||
)
|
)
|
||||||
elif parsed_url.query == 'restore':
|
elif 'restore' in query:
|
||||||
es = minidom.parseString(body).getElementsByTagName('Days')
|
es = minidom.parseString(body).getElementsByTagName('Days')
|
||||||
days = es[0].childNodes[0].wholeText
|
days = es[0].childNodes[0].wholeText
|
||||||
key = self.backend.get_key(bucket_name, key_name)
|
key = self.backend.get_key(bucket_name, key_name)
|
||||||
@ -642,6 +682,37 @@ S3_OBJECT_RESPONSE = """<PutObjectResponse xmlns="http://s3.amazonaws.com/doc/20
|
|||||||
</PutObjectResponse>
|
</PutObjectResponse>
|
||||||
</PutObjectResponse>"""
|
</PutObjectResponse>"""
|
||||||
|
|
||||||
|
S3_OBJECT_ACL_RESPONSE = """<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<AccessControlPolicy xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
|
||||||
|
<Owner>
|
||||||
|
<ID>75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a</ID>
|
||||||
|
<DisplayName>webfile</DisplayName>
|
||||||
|
</Owner>
|
||||||
|
<AccessControlList>
|
||||||
|
{% for grant in key.acl.grants %}
|
||||||
|
<Grant>
|
||||||
|
{% for grantee in grant.grantees %}
|
||||||
|
<Grantee xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:type="{{ grantee.type }}">
|
||||||
|
{% if grantee.uri %}
|
||||||
|
<URI>{{ grantee.uri }}</URI>
|
||||||
|
{% endif %}
|
||||||
|
{% if grantee.id %}
|
||||||
|
<ID>{{ grantee.id }}</ID>
|
||||||
|
{% endif %}
|
||||||
|
{% if grantee.display_name %}
|
||||||
|
<DisplayName>{{ grantee.display_name }}</DisplayName>
|
||||||
|
{% endif %}
|
||||||
|
</Grantee>
|
||||||
|
{% endfor %}
|
||||||
|
{% for permission in grant.permissions %}
|
||||||
|
<Permission>{{ permission }}</Permission>
|
||||||
|
{% endfor %}
|
||||||
|
</Grant>
|
||||||
|
{% endfor %}
|
||||||
|
</AccessControlList>
|
||||||
|
</AccessControlPolicy>"""
|
||||||
|
|
||||||
S3_OBJECT_COPY_RESPONSE = """<CopyObjectResponse xmlns="http://doc.s3.amazonaws.com/2006-03-01">
|
S3_OBJECT_COPY_RESPONSE = """<CopyObjectResponse xmlns="http://doc.s3.amazonaws.com/2006-03-01">
|
||||||
<CopyObjectResponse>
|
<CopyObjectResponse>
|
||||||
<ETag>{{ key.etag }}</ETag>
|
<ETag>{{ key.etag }}</ETag>
|
||||||
@ -717,7 +788,7 @@ S3_ALL_MULTIPARTS = """<?xml version="1.0" encoding="UTF-8"?>
|
|||||||
</Initiator>
|
</Initiator>
|
||||||
<Owner>
|
<Owner>
|
||||||
<ID>75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a</ID>
|
<ID>75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a</ID>
|
||||||
<DisplayName>OwnerDisplayName</DisplayName>
|
<DisplayName>webfile</DisplayName>
|
||||||
</Owner>
|
</Owner>
|
||||||
<StorageClass>STANDARD</StorageClass>
|
<StorageClass>STANDARD</StorageClass>
|
||||||
<Initiated>2010-11-10T20:48:33.000Z</Initiated>
|
<Initiated>2010-11-10T20:48:33.000Z</Initiated>
|
||||||
|
@ -726,7 +726,7 @@ def test_list_versions():
|
|||||||
|
|
||||||
|
|
||||||
@mock_s3
|
@mock_s3
|
||||||
def test_acl_is_ignored_for_now():
|
def test_acl_setting():
|
||||||
conn = boto.connect_s3()
|
conn = boto.connect_s3()
|
||||||
bucket = conn.create_bucket('foobar')
|
bucket = conn.create_bucket('foobar')
|
||||||
content = b'imafile'
|
content = b'imafile'
|
||||||
@ -741,6 +741,49 @@ def test_acl_is_ignored_for_now():
|
|||||||
|
|
||||||
assert key.get_contents_as_string() == content
|
assert key.get_contents_as_string() == content
|
||||||
|
|
||||||
|
grants = key.get_acl().acl.grants
|
||||||
|
assert any(g.uri == 'http://acs.amazonaws.com/groups/global/AllUsers' and
|
||||||
|
g.permission == 'READ' for g in grants), grants
|
||||||
|
|
||||||
|
|
||||||
|
@mock_s3
|
||||||
|
def test_acl_setting_via_headers():
|
||||||
|
conn = boto.connect_s3()
|
||||||
|
bucket = conn.create_bucket('foobar')
|
||||||
|
content = b'imafile'
|
||||||
|
keyname = 'test.txt'
|
||||||
|
|
||||||
|
key = Key(bucket, name=keyname)
|
||||||
|
key.content_type = 'text/plain'
|
||||||
|
key.set_contents_from_string(content, headers={
|
||||||
|
'x-amz-grant-full-control': 'uri="http://acs.amazonaws.com/groups/global/AllUsers"'
|
||||||
|
})
|
||||||
|
|
||||||
|
key = bucket.get_key(keyname)
|
||||||
|
|
||||||
|
assert key.get_contents_as_string() == content
|
||||||
|
|
||||||
|
grants = key.get_acl().acl.grants
|
||||||
|
assert any(g.uri == 'http://acs.amazonaws.com/groups/global/AllUsers' and
|
||||||
|
g.permission == 'FULL_CONTROL' for g in grants), grants
|
||||||
|
|
||||||
|
|
||||||
|
@mock_s3
|
||||||
|
def test_acl_switching():
|
||||||
|
conn = boto.connect_s3()
|
||||||
|
bucket = conn.create_bucket('foobar')
|
||||||
|
content = b'imafile'
|
||||||
|
keyname = 'test.txt'
|
||||||
|
|
||||||
|
key = Key(bucket, name=keyname)
|
||||||
|
key.content_type = 'text/plain'
|
||||||
|
key.set_contents_from_string(content, policy='public-read')
|
||||||
|
key.set_acl('private')
|
||||||
|
|
||||||
|
grants = key.get_acl().acl.grants
|
||||||
|
assert not any(g.uri == 'http://acs.amazonaws.com/groups/global/AllUsers' and
|
||||||
|
g.permission == 'READ' for g in grants), grants
|
||||||
|
|
||||||
|
|
||||||
@mock_s3
|
@mock_s3
|
||||||
def test_unicode_key():
|
def test_unicode_key():
|
||||||
|
Loading…
Reference in New Issue
Block a user