diff --git a/moto/s3/responses.py b/moto/s3/responses.py index 215fb0a13..6d8406667 100644 --- a/moto/s3/responses.py +++ b/moto/s3/responses.py @@ -80,6 +80,7 @@ DEFAULT_REGION_NAME = "us-east-1" ACTION_MAP = { "BUCKET": { + "HEAD": {"DEFAULT": "HeadBucket",}, "GET": { "uploads": "ListBucketMultipartUploads", "location": "GetBucketLocation", @@ -311,7 +312,7 @@ class ResponseObject(_TemplateEnvironmentMixin, ActionAuthenticatorMixin): body = "{0}".format(body).encode("utf-8") if method == "HEAD": - return self._bucket_response_head(bucket_name) + return self._bucket_response_head(bucket_name, querystring) elif method == "GET": return self._bucket_response_get(bucket_name, querystring) elif method == "PUT": @@ -335,7 +336,10 @@ class ResponseObject(_TemplateEnvironmentMixin, ActionAuthenticatorMixin): querystring = parse_qs(parsed_url.query, keep_blank_values=True) return querystring - def _bucket_response_head(self, bucket_name): + def _bucket_response_head(self, bucket_name, querystring): + self._set_action("BUCKET", "HEAD", querystring) + self._authenticate_and_authorize_s3_action() + try: self.backend.head_bucket(bucket_name) except MissingBucket: diff --git a/tests/test_s3/test_s3_auth.py b/tests/test_s3/test_s3_auth.py index ca3b4929d..a829e3190 100644 --- a/tests/test_s3/test_s3_auth.py +++ b/tests/test_s3/test_s3_auth.py @@ -1,8 +1,10 @@ import boto3 +import json import pytest +import sure # noqa from botocore.exceptions import ClientError -from moto import mock_s3, settings +from moto import mock_iam, mock_s3, settings from moto.core import set_initial_no_auth_action_count from unittest import SkipTest @@ -24,3 +26,93 @@ def test_load_unexisting_object_without_auth_should_return_403(): err["Message"].should.equal( "The AWS Access Key Id you provided does not exist in our records." ) + + +@set_initial_no_auth_action_count(4) +@mock_s3 +def test_head_bucket_with_correct_credentials(): + if settings.TEST_SERVER_MODE: + raise SkipTest("Auth decorator does not work in server mode") + + # These calls are all unauthenticated + iam_keys = create_user_with_access_key_and_policy() + + # This S3-client has correct credentials + s3 = boto3.client( + "s3", + aws_access_key_id=iam_keys["AccessKeyId"], + aws_secret_access_key=iam_keys["SecretAccessKey"], + ) + s3.create_bucket(Bucket="mock_bucket") + + # Calling head_bucket with the correct credentials works + my_head_bucket( + "mock_bucket", + aws_access_key_id=iam_keys["AccessKeyId"], + aws_secret_access_key=iam_keys["SecretAccessKey"], + ) + + +@set_initial_no_auth_action_count(4) +@mock_s3 +def test_head_bucket_with_incorrect_credentials(): + if settings.TEST_SERVER_MODE: + raise SkipTest("Auth decorator does not work in server mode") + + # These calls are all authenticated + iam_keys = create_user_with_access_key_and_policy() + + # Create the bucket with correct credentials + s3 = boto3.client( + "s3", + aws_access_key_id=iam_keys["AccessKeyId"], + aws_secret_access_key=iam_keys["SecretAccessKey"], + ) + s3.create_bucket(Bucket="mock_bucket") + + # Call head_bucket with incorrect credentials + with pytest.raises(ClientError) as ex: + my_head_bucket( + "mock_bucket", + aws_access_key_id=iam_keys["AccessKeyId"], + aws_secret_access_key="invalid", + ) + err = ex.value.response["Error"] + err["Code"].should.equal("SignatureDoesNotMatch") + err["Message"].should.equal( + "The request signature we calculated does not match the signature you provided. " + "Check your key and signing method." + ) + + +def my_head_bucket(bucket, aws_access_key_id, aws_secret_access_key): + s3_client = boto3.client( + "s3", + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + ) + s3_client.head_bucket(Bucket=bucket) + + +@mock_iam +def create_user_with_access_key_and_policy(user_name="test-user"): + """ + Should create a user with attached policy allowing read/write operations on S3. + """ + policy_document = { + "Version": "2012-10-17", + "Statement": [{"Effect": "Allow", "Action": "s3:*", "Resource": "*"}], + } + + # Create client and user + client = boto3.client("iam", region_name="us-east-1") + client.create_user(UserName=user_name) + + # Create and attach the policy + policy_arn = client.create_policy( + PolicyName="policy1", PolicyDocument=json.dumps(policy_document) + )["Policy"]["Arn"] + client.attach_user_policy(UserName=user_name, PolicyArn=policy_arn) + + # Return the access keys + return client.create_access_key(UserName=user_name)["AccessKey"]