enforce s3 acls
This commit is contained in:
parent
35feb9feb7
commit
2055bb62f5
@ -201,10 +201,18 @@ class FakeGrantee(BaseModel):
|
||||
self.uri = uri
|
||||
self.display_name = display_name
|
||||
|
||||
def __eq__(self, other):
|
||||
if not isinstance(other, FakeGrantee):
|
||||
return False
|
||||
return self.id == other.id and self.uri == other.uri and self.display_name == other.display_name
|
||||
|
||||
@property
|
||||
def type(self):
|
||||
return 'Group' if self.uri else 'CanonicalUser'
|
||||
|
||||
def __repr__(self):
|
||||
return "FakeGrantee(display_name: '{}', id: '{}', uri: '{}')".format(self.display_name, self.id, self.uri)
|
||||
|
||||
|
||||
ALL_USERS_GRANTEE = FakeGrantee(
|
||||
uri='http://acs.amazonaws.com/groups/global/AllUsers')
|
||||
@ -226,12 +234,28 @@ class FakeGrant(BaseModel):
|
||||
self.grantees = grantees
|
||||
self.permissions = permissions
|
||||
|
||||
def __repr__(self):
|
||||
return "FakeGrant(grantees: {}, permissions: {})".format(self.grantees, self.permissions)
|
||||
|
||||
|
||||
class FakeAcl(BaseModel):
|
||||
|
||||
def __init__(self, grants=[]):
|
||||
self.grants = grants
|
||||
|
||||
@property
|
||||
def public_read(self):
|
||||
for grant in self.grants:
|
||||
if ALL_USERS_GRANTEE in grant.grantees:
|
||||
if PERMISSION_READ in grant.permissions:
|
||||
return True
|
||||
if PERMISSION_FULL_CONTROL in grant.permissions:
|
||||
return True
|
||||
return False
|
||||
|
||||
def __repr__(self):
|
||||
return "FakeAcl(grants: {})".format(self.grants)
|
||||
|
||||
|
||||
def get_canned_acl(acl):
|
||||
owner_grantee = FakeGrantee(
|
||||
|
@ -373,9 +373,8 @@ class ResponseObject(_TemplateEnvironmentMixin):
|
||||
self.backend.set_bucket_policy(bucket_name, body)
|
||||
return 'True'
|
||||
elif 'acl' in querystring:
|
||||
acl = self._acl_from_headers(request.headers)
|
||||
# TODO: Support the XML-based ACL format
|
||||
self.backend.set_bucket_acl(bucket_name, acl)
|
||||
self.backend.set_bucket_acl(bucket_name, self._acl_from_headers(request.headers))
|
||||
return ""
|
||||
elif "tagging" in querystring:
|
||||
tagging = self._bucket_tagging_from_xml(body)
|
||||
@ -407,6 +406,11 @@ class ResponseObject(_TemplateEnvironmentMixin):
|
||||
new_bucket = self.backend.get_bucket(bucket_name)
|
||||
else:
|
||||
raise
|
||||
|
||||
if 'x-amz-acl' in request.headers:
|
||||
# TODO: Support the XML-based ACL format
|
||||
self.backend.set_bucket_acl(bucket_name, self._acl_from_headers(request.headers))
|
||||
|
||||
template = self.response_template(S3_BUCKET_CREATE_RESPONSE)
|
||||
return 200, {}, template.render(bucket=new_bucket)
|
||||
|
||||
@ -536,6 +540,17 @@ class ResponseObject(_TemplateEnvironmentMixin):
|
||||
key_name = self.parse_key_name(request, parsed_url.path)
|
||||
bucket_name = self.parse_bucket_name_from_url(request, full_url)
|
||||
|
||||
# Because we patch the requests library the boto/boto3 API
|
||||
# requests go through this method but so do
|
||||
# `requests.get("https://bucket-name.s3.amazonaws.com/file-name")`
|
||||
# Here we deny public access to private files by checking the
|
||||
# ACL and checking for the mere presence of an Authorization
|
||||
# header.
|
||||
if 'Authorization' not in request.headers:
|
||||
key = self.backend.get_key(bucket_name, key_name)
|
||||
if key and not key.acl.public_read:
|
||||
return 403, {}, ""
|
||||
|
||||
if hasattr(request, 'body'):
|
||||
# Boto
|
||||
body = request.body
|
||||
@ -725,7 +740,7 @@ class ResponseObject(_TemplateEnvironmentMixin):
|
||||
if grants:
|
||||
return FakeAcl(grants)
|
||||
else:
|
||||
return None
|
||||
return get_canned_acl('private')
|
||||
|
||||
def _tagging_from_headers(self, headers):
|
||||
if headers.get('x-amz-tagging'):
|
||||
|
@ -864,6 +864,36 @@ def test_bucket_acl_switching():
|
||||
g.permission == 'READ' for g in grants), grants
|
||||
|
||||
|
||||
@mock_s3
|
||||
def test_s3_object_in_public_bucket():
|
||||
s3 = boto3.resource('s3')
|
||||
bucket = s3.Bucket('test-bucket')
|
||||
bucket.create(ACL='public-read')
|
||||
bucket.put_object(ACL='public-read', Body=b'ABCD', Key='file.txt')
|
||||
direct_url = 'https://test-bucket.s3.amazonaws.com/file.txt'
|
||||
response = requests.get(direct_url)
|
||||
response.status_code.should.equal(200)
|
||||
response.content.should.equal(b'ABCD')
|
||||
bucket.put_object(ACL='private', Body=b'ABCD', Key='file.txt')
|
||||
response = requests.get(direct_url)
|
||||
response.status_code.should.equal(403)
|
||||
|
||||
|
||||
@mock_s3
|
||||
def test_s3_object_in_private_bucket():
|
||||
s3 = boto3.resource('s3')
|
||||
bucket = s3.Bucket('test-bucket')
|
||||
bucket.create(ACL='private')
|
||||
bucket.put_object(ACL='private', Body=b'ABCD', Key='file.txt')
|
||||
direct_url = 'https://test-bucket.s3.amazonaws.com/file.txt'
|
||||
response = requests.get(direct_url)
|
||||
response.status_code.should.equal(403)
|
||||
bucket.put_object(ACL='public-read', Body=b'ABCD', Key='file.txt')
|
||||
response = requests.get(direct_url)
|
||||
response.status_code.should.equal(200)
|
||||
response.content.should.equal(b'ABCD')
|
||||
|
||||
|
||||
@mock_s3_deprecated
|
||||
def test_unicode_key():
|
||||
conn = boto.connect_s3()
|
||||
|
Loading…
Reference in New Issue
Block a user