From 2055bb62f505f376fc049efdd8418f82af01ee86 Mon Sep 17 00:00:00 2001 From: Jack Danger Date: Sat, 16 Sep 2017 06:08:27 -0700 Subject: [PATCH] enforce s3 acls --- moto/s3/models.py | 24 ++++++++++++++++++++++++ moto/s3/responses.py | 21 ++++++++++++++++++--- tests/test_s3/test_s3.py | 30 ++++++++++++++++++++++++++++++ 3 files changed, 72 insertions(+), 3 deletions(-) diff --git a/moto/s3/models.py b/moto/s3/models.py index abe92bdf1..ae05292f2 100644 --- a/moto/s3/models.py +++ b/moto/s3/models.py @@ -201,10 +201,18 @@ class FakeGrantee(BaseModel): self.uri = uri self.display_name = display_name + def __eq__(self, other): + if not isinstance(other, FakeGrantee): + return False + return self.id == other.id and self.uri == other.uri and self.display_name == other.display_name + @property def type(self): return 'Group' if self.uri else 'CanonicalUser' + def __repr__(self): + return "FakeGrantee(display_name: '{}', id: '{}', uri: '{}')".format(self.display_name, self.id, self.uri) + ALL_USERS_GRANTEE = FakeGrantee( uri='http://acs.amazonaws.com/groups/global/AllUsers') @@ -226,12 +234,28 @@ class FakeGrant(BaseModel): self.grantees = grantees self.permissions = permissions + def __repr__(self): + return "FakeGrant(grantees: {}, permissions: {})".format(self.grantees, self.permissions) + class FakeAcl(BaseModel): def __init__(self, grants=[]): self.grants = grants + @property + def public_read(self): + for grant in self.grants: + if ALL_USERS_GRANTEE in grant.grantees: + if PERMISSION_READ in grant.permissions: + return True + if PERMISSION_FULL_CONTROL in grant.permissions: + return True + return False + + def __repr__(self): + return "FakeAcl(grants: {})".format(self.grants) + def get_canned_acl(acl): owner_grantee = FakeGrantee( diff --git a/moto/s3/responses.py b/moto/s3/responses.py index 4da888de5..fbd142a34 100755 --- a/moto/s3/responses.py +++ b/moto/s3/responses.py @@ -373,9 +373,8 @@ class ResponseObject(_TemplateEnvironmentMixin): self.backend.set_bucket_policy(bucket_name, body) return 'True' elif 'acl' in querystring: - acl = self._acl_from_headers(request.headers) # TODO: Support the XML-based ACL format - self.backend.set_bucket_acl(bucket_name, acl) + self.backend.set_bucket_acl(bucket_name, self._acl_from_headers(request.headers)) return "" elif "tagging" in querystring: tagging = self._bucket_tagging_from_xml(body) @@ -407,6 +406,11 @@ class ResponseObject(_TemplateEnvironmentMixin): new_bucket = self.backend.get_bucket(bucket_name) else: raise + + if 'x-amz-acl' in request.headers: + # TODO: Support the XML-based ACL format + self.backend.set_bucket_acl(bucket_name, self._acl_from_headers(request.headers)) + template = self.response_template(S3_BUCKET_CREATE_RESPONSE) return 200, {}, template.render(bucket=new_bucket) @@ -536,6 +540,17 @@ class ResponseObject(_TemplateEnvironmentMixin): key_name = self.parse_key_name(request, parsed_url.path) bucket_name = self.parse_bucket_name_from_url(request, full_url) + # Because we patch the requests library the boto/boto3 API + # requests go through this method but so do + # `requests.get("https://bucket-name.s3.amazonaws.com/file-name")` + # Here we deny public access to private files by checking the + # ACL and checking for the mere presence of an Authorization + # header. + if 'Authorization' not in request.headers: + key = self.backend.get_key(bucket_name, key_name) + if key and not key.acl.public_read: + return 403, {}, "" + if hasattr(request, 'body'): # Boto body = request.body @@ -725,7 +740,7 @@ class ResponseObject(_TemplateEnvironmentMixin): if grants: return FakeAcl(grants) else: - return None + return get_canned_acl('private') def _tagging_from_headers(self, headers): if headers.get('x-amz-tagging'): diff --git a/tests/test_s3/test_s3.py b/tests/test_s3/test_s3.py index 331452a7d..f7898fcb4 100644 --- a/tests/test_s3/test_s3.py +++ b/tests/test_s3/test_s3.py @@ -864,6 +864,36 @@ def test_bucket_acl_switching(): g.permission == 'READ' for g in grants), grants +@mock_s3 +def test_s3_object_in_public_bucket(): + s3 = boto3.resource('s3') + bucket = s3.Bucket('test-bucket') + bucket.create(ACL='public-read') + bucket.put_object(ACL='public-read', Body=b'ABCD', Key='file.txt') + direct_url = 'https://test-bucket.s3.amazonaws.com/file.txt' + response = requests.get(direct_url) + response.status_code.should.equal(200) + response.content.should.equal(b'ABCD') + bucket.put_object(ACL='private', Body=b'ABCD', Key='file.txt') + response = requests.get(direct_url) + response.status_code.should.equal(403) + + +@mock_s3 +def test_s3_object_in_private_bucket(): + s3 = boto3.resource('s3') + bucket = s3.Bucket('test-bucket') + bucket.create(ACL='private') + bucket.put_object(ACL='private', Body=b'ABCD', Key='file.txt') + direct_url = 'https://test-bucket.s3.amazonaws.com/file.txt' + response = requests.get(direct_url) + response.status_code.should.equal(403) + bucket.put_object(ACL='public-read', Body=b'ABCD', Key='file.txt') + response = requests.get(direct_url) + response.status_code.should.equal(200) + response.content.should.equal(b'ABCD') + + @mock_s3_deprecated def test_unicode_key(): conn = boto.connect_s3()