diff --git a/moto/s3/responses.py b/moto/s3/responses.py
index 2bd3e5c3a..03a0e6150 100644
--- a/moto/s3/responses.py
+++ b/moto/s3/responses.py
@@ -25,6 +25,7 @@ from moto.s3bucket_path.utils import (
from .exceptions import (
BucketAlreadyExists,
+ BucketAccessDeniedError,
BucketMustHaveLockeEnabled,
DuplicateTagKeys,
InvalidContentMD5,
@@ -954,9 +955,13 @@ class ResponseObject(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
if self.is_delete_keys(request, path, bucket_name):
self.data["Action"] = "DeleteObject"
- self._authenticate_and_authorize_s3_action()
-
- return self._bucket_response_delete_keys(body, bucket_name)
+ try:
+ self._authenticate_and_authorize_s3_action()
+ return self._bucket_response_delete_keys(body, bucket_name)
+ except BucketAccessDeniedError:
+ return self._bucket_response_delete_keys(
+ body, bucket_name, authenticated=False
+ )
self.data["Action"] = "PutObject"
self._authenticate_and_authorize_s3_action()
@@ -1018,7 +1023,7 @@ class ResponseObject(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
else path_url(request.url)
)
- def _bucket_response_delete_keys(self, body, bucket_name):
+ def _bucket_response_delete_keys(self, body, bucket_name, authenticated=True):
template = self.response_template(S3_DELETE_KEYS_RESPONSE)
body_dict = xmltodict.parse(body, strip_whitespace=False)
@@ -1030,13 +1035,18 @@ class ResponseObject(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
if len(objects) == 0:
raise MalformedXML()
- deleted_objects = self.backend.delete_objects(bucket_name, objects)
- error_names = []
+ if authenticated:
+ deleted_objects = self.backend.delete_objects(bucket_name, objects)
+ errors = []
+ else:
+ deleted_objects = []
+ # [(key_name, errorcode, 'error message'), ..]
+ errors = [(o["Key"], "AccessDenied", "Access Denied") for o in objects]
return (
200,
{},
- template.render(deleted=deleted_objects, delete_errors=error_names),
+ template.render(deleted=deleted_objects, delete_errors=errors),
)
def _handle_range_header(self, request, response_headers, response_content):
@@ -2285,9 +2295,11 @@ S3_DELETE_KEYS_RESPONSE = """
{% if v %}{{v}}{% endif %}
{% endfor %}
-{% for k in delete_errors %}
+{% for k,c,m in delete_errors %}
{{k}}
+{{c}}
+{{m}}
{% endfor %}
"""
diff --git a/tests/test_s3/test_s3_auth.py b/tests/test_s3/test_s3_auth.py
index af6d1cddc..feb22f035 100644
--- a/tests/test_s3/test_s3_auth.py
+++ b/tests/test_s3/test_s3_auth.py
@@ -4,8 +4,8 @@ import pytest
import sure # noqa # pylint: disable=unused-import
from botocore.exceptions import ClientError
-from moto import mock_iam, mock_s3, settings
-from moto.core import set_initial_no_auth_action_count
+from moto import mock_iam, mock_s3, mock_sts, settings
+from moto.core import ACCOUNT_ID, set_initial_no_auth_action_count
from unittest import SkipTest
@@ -116,3 +116,91 @@ def create_user_with_access_key_and_policy(user_name="test-user"):
# Return the access keys
return client.create_access_key(UserName=user_name)["AccessKey"]
+
+
+@mock_iam
+@mock_sts
+def create_role_with_attached_policy_and_assume_it(
+ role_name,
+ trust_policy_document,
+ policy_document,
+ session_name="session1",
+ policy_name="policy1",
+):
+ iam_client = boto3.client("iam", region_name="us-east-1")
+ sts_client = boto3.client("sts", region_name="us-east-1")
+ role_arn = iam_client.create_role(
+ RoleName=role_name, AssumeRolePolicyDocument=json.dumps(trust_policy_document)
+ )["Role"]["Arn"]
+ policy_arn = iam_client.create_policy(
+ PolicyName=policy_name, PolicyDocument=json.dumps(policy_document)
+ )["Policy"]["Arn"]
+ iam_client.attach_role_policy(RoleName=role_name, PolicyArn=policy_arn)
+ return sts_client.assume_role(RoleArn=role_arn, RoleSessionName=session_name)[
+ "Credentials"
+ ]
+
+
+@set_initial_no_auth_action_count(7)
+@mock_iam
+@mock_s3
+@mock_sts
+def test_delete_objects_without_access_throws_custom_error():
+ if settings.TEST_SERVER_MODE:
+ raise SkipTest("Auth decorator does not work in server mode")
+
+ role_name = "some-test-role"
+ bucket_name = "some-test-bucket"
+
+ # Setup Bucket
+ client = boto3.client("s3", region_name="us-east-1")
+ client.create_bucket(Bucket=bucket_name)
+ client.put_object(Bucket=bucket_name, Key="some/prefix/test_file.txt")
+
+ trust_policy_document = {
+ "Version": "2012-10-17",
+ "Statement": {
+ "Effect": "Allow",
+ "Principal": {"AWS": f"arn:aws:iam::{ACCOUNT_ID}:root"},
+ "Action": "sts:AssumeRole",
+ },
+ }
+
+ # Setup User with the correct access
+ policy_document = {
+ "Version": "2012-10-17",
+ "Statement": [
+ {
+ "Effect": "Allow",
+ "Action": ["s3:ListBucket", "s3:GetBucketLocation"],
+ "Resource": f"arn:aws:s3:::{bucket_name}",
+ },
+ {
+ "Effect": "Allow",
+ "Action": ["s3:PutObject", "s3:GetObject"],
+ "Resource": f"arn:aws:s3:::{bucket_name}/*",
+ },
+ ],
+ }
+ credentials = create_role_with_attached_policy_and_assume_it(
+ role_name, trust_policy_document, policy_document
+ )
+
+ session = boto3.session.Session(region_name="us-east-1")
+ s3_resource = session.resource(
+ "s3",
+ aws_access_key_id=credentials["AccessKeyId"],
+ aws_secret_access_key=credentials["SecretAccessKey"],
+ aws_session_token=credentials["SessionToken"],
+ )
+ bucket = s3_resource.Bucket(bucket_name)
+
+ # This action is not allowed
+ # It should return a 200-response, with the body indicating that we do not have access
+ response = bucket.objects.filter(Prefix="some/prefix").delete()[0]
+ response.should.have.key("Errors").length_of(1)
+
+ error = response["Errors"][0]
+ error.should.have.key("Key").equals("some/prefix/test_file.txt")
+ error.should.have.key("Code").equals("AccessDenied")
+ error.should.have.key("Message").equals("Access Denied")