S3: Improve bucket policy enforcement (#5883)

This commit is contained in:
Bert Blommers 2023-01-31 10:33:57 -01:00 committed by GitHub
parent 67197cb8a5
commit ad016112fe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 137 additions and 30 deletions

View File

@ -119,6 +119,14 @@ s3
- [X] put_bucket_ownership_controls
- [X] put_bucket_policy
Basic policy enforcement is in place.
Restrictions:
- Only statements with principal=* are taken into account
- Conditions are not taken into account
- [X] put_bucket_replication
- [ ] put_bucket_request_payment
- [X] put_bucket_tagging

View File

@ -376,6 +376,8 @@ class IAMPolicyStatement(object):
is_action_concerned = True
if is_action_concerned:
if self.is_unknown_principal(self._statement.get("Principal")):
return PermissionResult.NEUTRAL
same_resource = self._match(self._statement["Resource"], resource)
if self._statement["Effect"] == "Allow" and same_resource:
return PermissionResult.PERMITTED
@ -384,6 +386,21 @@ class IAMPolicyStatement(object):
else:
return PermissionResult.NEUTRAL
def is_unknown_principal(self, principal) -> bool:
# https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-bucket-user-policy-specifying-principal-intro.html
# For now, Moto only verifies principal == *
# 'Unknown' principals are not verified
#
# This should be extended to check:
# - Can the principal be empty? How behaves AWS?
# - allow one/multiple account ARN's
# - allow one/multiple rules
if principal is None:
return False
if isinstance(principal, str) and principal != "*":
return True
return False
def _check_element_matches(self, statement_element, value):
if isinstance(self._statement[statement_element], list):
for statement_element_value in self._statement[statement_element]:

View File

@ -12,6 +12,7 @@ import sys
import urllib.parse
from bisect import insort
from typing import Optional
from importlib import reload
from moto.core import BaseBackend, BaseModel, BackendDict, CloudFormationModel
from moto.core import CloudWatchMetricProvider
@ -933,14 +934,14 @@ class FakeBucket(CloudFormationModel):
def is_versioned(self):
return self.versioning_status == "Enabled"
def allow_action(self, action, resource):
if self.policy is None:
return False
def get_permission(self, action, resource):
from moto.iam.access_control import IAMPolicy, PermissionResult
if self.policy is None:
return PermissionResult.NEUTRAL
iam_policy = IAMPolicy(self.policy.decode())
result = iam_policy.is_action_permitted(action, resource)
return result == PermissionResult.PERMITTED
return iam_policy.is_action_permitted(action, resource)
def set_lifecycle(self, rules):
self.rules = []
@ -1679,6 +1680,13 @@ class S3Backend(BaseBackend, CloudWatchMetricProvider):
return self.get_bucket(bucket_name).policy
def put_bucket_policy(self, bucket_name, policy):
"""
Basic policy enforcement is in place.
Restrictions:
- Only statements with principal=* are taken into account
- Conditions are not taken into account
"""
self.get_bucket(bucket_name).policy = policy
def delete_bucket_policy(self, bucket_name):
@ -1845,7 +1853,7 @@ class S3Backend(BaseBackend, CloudWatchMetricProvider):
version_id=None,
part_number=None,
key_is_clean=False,
):
) -> Optional[FakeKey]:
if not key_is_clean:
key_name = clean_key_name(key_name)
bucket = self.get_bucket(bucket_name)

View File

@ -1150,29 +1150,48 @@ class S3Response(BaseResponse):
key_name = self.parse_key_name(request, parsed_url.path)
bucket_name = self.parse_bucket_name_from_url(request, full_url)
# Because we patch the requests library the boto/boto3 API
# requests go through this method but so do
# `requests.get("https://bucket-name.s3.amazonaws.com/file-name")`
# Here we deny public access to private files by checking the
# ACL and checking for the mere presence of an Authorization
# header.
if "Authorization" not in request.headers:
if hasattr(request, "url"):
signed_url = "Signature=" in request.url
elif hasattr(request, "requestline"):
signed_url = "Signature=" in request.path
# SDK requests tend to have Authorization set automatically
# If users make an HTTP-request, such as `requests.get("https://bucket-name.s3.amazonaws.com/file-name")`,
# The authorization-header may not be set
authorized_request = "Authorization" in request.headers
if hasattr(request, "url"):
signed_url = "Signature=" in request.url
elif hasattr(request, "requestline"):
signed_url = "Signature=" in request.path
try:
key = self.backend.get_object(bucket_name, key_name)
bucket = self.backend.get_bucket(bucket_name)
except S3ClientError:
key = bucket = None
if key:
resource = f"arn:aws:s3:::{bucket_name}/{key_name}"
if key and not signed_url:
bucket = self.backend.get_bucket(bucket_name)
resource = f"arn:aws:s3:::{bucket_name}/{key_name}"
bucket_policy_allows = bucket.allow_action("s3:GetObject", resource)
if not bucket_policy_allows and (key.acl and not key.acl.public_read):
# Authorization Workflow
# https://docs.aws.amazon.com/AmazonS3/latest/userguide/access-control-auth-workflow-object-operation.html
# A bucket can deny all actions, regardless of who makes the request
from moto.iam.access_control import PermissionResult
action = f"s3:{method.upper()[0]}{method.lower()[1:]}Object"
bucket_permissions = bucket.get_permission(action, resource)
if bucket_permissions == PermissionResult.DENIED:
return 403, {}, ""
# If the request is not authorized, and not signed,
# that means that the action should be allowed for anonymous users
if not authorized_request and not signed_url:
# We already know that the bucket permissions do not explicitly deny this
# So bucket permissions are either not set, or do not explicitly allow
# Next check is to see if the ACL of the individual key allows this action
if bucket_permissions != PermissionResult.PERMITTED and (
key.acl and not key.acl.public_read
):
return 403, {}, ""
elif signed_url and not key:
# coming in from requests.get(s3.generate_presigned_url())
if self._invalid_headers(request.url, dict(request.headers)):
return 403, {}, S3_INVALID_PRESIGNED_PARAMETERS
elif signed_url and not authorized_request:
# coming in from requests.get(s3.generate_presigned_url())
if self._invalid_headers(request.url, dict(request.headers)):
return 403, {}, S3_INVALID_PRESIGNED_PARAMETERS
if hasattr(request, "body"):
# Boto

View File

@ -4,6 +4,7 @@ import requests
import pytest
import sure # noqa # pylint: disable=unused-import
from botocore.exceptions import ClientError
from moto.moto_server.threaded_moto_server import ThreadedMotoServer
@ -36,24 +37,78 @@ class TestBucketPolicy:
({"resource": "arn:aws:s3:::mybucket/test_txt"}, 200),
({"resource": "arn:aws:s3:::notmybucket/*"}, 403),
({"resource": "arn:aws:s3:::mybucket/other*"}, 403),
({"actions": ["s3:PutObject"]}, 403),
({"effect": "Deny"}, 403),
],
)
def test_policy_allow_all(self, kwargs, status):
def test_block_or_allow_get_object(self, kwargs, status):
self._put_policy(**kwargs)
if status == 200:
self.client.get_object(Bucket="mybucket", Key="test_txt")
else:
with pytest.raises(ClientError):
self.client.get_object(Bucket="mybucket", Key="test_txt")
requests.get(self.key_name).status_code.should.equal(status)
def test_block_put_object(self):
# Block Put-access
self._put_policy(**{"effect": "Deny", "actions": ["s3:PutObject"]})
# GET still works
self.client.get_object(Bucket="mybucket", Key="test_txt")
# But Put (via boto3 or requests) is not allowed
with pytest.raises(ClientError) as exc:
self.client.put_object(Bucket="mybucket", Key="test_txt", Body="new data")
err = exc.value.response["Error"]
err["Message"].should.equal("Forbidden")
requests.put(self.key_name).status_code.should.equal(403)
def test_block_all_actions(self):
# Block all access
self._put_policy(**{"effect": "Deny", "actions": ["s3:*"]})
# Nothing works
with pytest.raises(ClientError) as exc:
self.client.get_object(Bucket="mybucket", Key="test_txt")
err = exc.value.response["Error"]
err["Message"].should.equal("Forbidden")
# But Put (via boto3 or requests) is not allowed
with pytest.raises(ClientError) as exc:
self.client.put_object(Bucket="mybucket", Key="test_txt", Body="new data")
err = exc.value.response["Error"]
err["Message"].should.equal("Forbidden")
requests.get(self.key_name).status_code.should.equal(403)
requests.put(self.key_name).status_code.should.equal(403)
# Allow access again, because we want to delete the object during teardown
self._put_policy(**{"effect": "Allow", "actions": ["s3:*"]})
def test_block_all_with_different_principal(self):
# Block all access for principal y
self._put_policy(**{"effect": "Deny", "actions": ["s3:*"], "principal": "y"})
# Everything works - Moto only blocks access for principal *
self.client.get_object(Bucket="mybucket", Key="test_txt")
self.client.put_object(Bucket="mybucket", Key="test_txt", Body="new data")
def _put_policy(
self, resource="arn:aws:s3:::mybucket/*", effect="Allow", actions=None
self,
resource="arn:aws:s3:::mybucket/*",
effect="Allow",
actions=None,
principal=None,
):
policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": effect,
"Principal": "*",
"Principal": principal or "*",
"Action": actions or ["s3:GetObject"],
"Resource": resource,
}