214 lines
7.0 KiB
Python
214 lines
7.0 KiB
Python
import json
|
|
from unittest import SkipTest
|
|
|
|
import boto3
|
|
import pytest
|
|
from botocore.exceptions import ClientError
|
|
|
|
from moto import mock_aws, settings
|
|
from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID
|
|
from moto.core import set_initial_no_auth_action_count
|
|
|
|
DEFAULT_REGION_NAME = "us-east-1"
|
|
|
|
|
|
@mock_aws
|
|
@set_initial_no_auth_action_count(0)
|
|
def test_load_unexisting_object_without_auth_should_return_403():
|
|
if not settings.TEST_DECORATOR_MODE:
|
|
raise SkipTest("Auth decorator does not work in server mode")
|
|
|
|
# Head an S3 object we should have no access to.
|
|
resource = boto3.resource("s3", region_name="us-east-1")
|
|
|
|
obj = resource.Object("myfakebucket", "myfakekey")
|
|
with pytest.raises(ClientError) as ex:
|
|
obj.load()
|
|
err = ex.value.response["Error"]
|
|
assert err["Code"] == "InvalidAccessKeyId"
|
|
assert err["Message"] == (
|
|
"The AWS Access Key Id you provided does not exist in our records."
|
|
)
|
|
|
|
|
|
@set_initial_no_auth_action_count(4)
|
|
@mock_aws
|
|
def test_head_bucket_with_correct_credentials():
|
|
if not settings.TEST_DECORATOR_MODE:
|
|
raise SkipTest("Auth decorator does not work in server mode")
|
|
|
|
# These calls are all unauthenticated
|
|
iam_keys = create_user_with_access_key_and_policy()
|
|
|
|
# This S3-client has correct credentials
|
|
s3_client = boto3.client(
|
|
"s3",
|
|
aws_access_key_id=iam_keys["AccessKeyId"],
|
|
aws_secret_access_key=iam_keys["SecretAccessKey"],
|
|
region_name=DEFAULT_REGION_NAME,
|
|
)
|
|
s3_client.create_bucket(Bucket="mock_bucket")
|
|
|
|
# Calling head_bucket with the correct credentials works
|
|
my_head_bucket(
|
|
"mock_bucket",
|
|
aws_access_key_id=iam_keys["AccessKeyId"],
|
|
aws_secret_access_key=iam_keys["SecretAccessKey"],
|
|
)
|
|
|
|
# Verify we can make calls that contain a querystring. Specifically,
|
|
# verify that we are able to build/match the AWS signature for a URL
|
|
# with a querystring.
|
|
s3_client.get_bucket_location(Bucket="mock_bucket")
|
|
s3_client.list_objects_v2(Bucket="mock_bucket")
|
|
|
|
|
|
@set_initial_no_auth_action_count(4)
|
|
@mock_aws
|
|
def test_head_bucket_with_incorrect_credentials():
|
|
if not settings.TEST_DECORATOR_MODE:
|
|
raise SkipTest("Auth decorator does not work in server mode")
|
|
|
|
# These calls are all authenticated
|
|
iam_keys = create_user_with_access_key_and_policy()
|
|
|
|
# Create the bucket with correct credentials
|
|
s3_client = boto3.client(
|
|
"s3",
|
|
aws_access_key_id=iam_keys["AccessKeyId"],
|
|
aws_secret_access_key=iam_keys["SecretAccessKey"],
|
|
region_name=DEFAULT_REGION_NAME,
|
|
)
|
|
s3_client.create_bucket(Bucket="mock_bucket")
|
|
|
|
# Call head_bucket with incorrect credentials
|
|
with pytest.raises(ClientError) as ex:
|
|
my_head_bucket(
|
|
"mock_bucket",
|
|
aws_access_key_id=iam_keys["AccessKeyId"],
|
|
aws_secret_access_key="invalid",
|
|
)
|
|
err = ex.value.response["Error"]
|
|
assert err["Code"] == "SignatureDoesNotMatch"
|
|
assert err["Message"] == (
|
|
"The request signature we calculated does not match the signature you provided. "
|
|
"Check your key and signing method."
|
|
)
|
|
|
|
|
|
def my_head_bucket(bucket, aws_access_key_id, aws_secret_access_key):
|
|
s3_client = boto3.client(
|
|
"s3",
|
|
aws_access_key_id=aws_access_key_id,
|
|
aws_secret_access_key=aws_secret_access_key,
|
|
)
|
|
s3_client.head_bucket(Bucket=bucket)
|
|
|
|
|
|
def create_user_with_access_key_and_policy(user_name="test-user"):
|
|
"""
|
|
Should create a user with attached policy allowing read/write operations on S3.
|
|
"""
|
|
policy_document = {
|
|
"Version": "2012-10-17",
|
|
"Statement": [{"Effect": "Allow", "Action": "s3:*", "Resource": "*"}],
|
|
}
|
|
|
|
# Create client and user
|
|
client = boto3.client("iam", region_name="us-east-1")
|
|
client.create_user(UserName=user_name)
|
|
|
|
# Create and attach the policy
|
|
policy_arn = client.create_policy(
|
|
PolicyName="policy1", PolicyDocument=json.dumps(policy_document)
|
|
)["Policy"]["Arn"]
|
|
client.attach_user_policy(UserName=user_name, PolicyArn=policy_arn)
|
|
|
|
# Return the access keys
|
|
return client.create_access_key(UserName=user_name)["AccessKey"]
|
|
|
|
|
|
def create_role_with_attached_policy_and_assume_it(
|
|
role_name,
|
|
trust_policy_document,
|
|
policy_document,
|
|
session_name="session1",
|
|
policy_name="policy1",
|
|
):
|
|
iam_client = boto3.client("iam", region_name="us-east-1")
|
|
sts_client = boto3.client("sts", region_name="us-east-1")
|
|
role_arn = iam_client.create_role(
|
|
RoleName=role_name, AssumeRolePolicyDocument=json.dumps(trust_policy_document)
|
|
)["Role"]["Arn"]
|
|
policy_arn = iam_client.create_policy(
|
|
PolicyName=policy_name, PolicyDocument=json.dumps(policy_document)
|
|
)["Policy"]["Arn"]
|
|
iam_client.attach_role_policy(RoleName=role_name, PolicyArn=policy_arn)
|
|
return sts_client.assume_role(RoleArn=role_arn, RoleSessionName=session_name)[
|
|
"Credentials"
|
|
]
|
|
|
|
|
|
@set_initial_no_auth_action_count(7)
|
|
@mock_aws
|
|
def test_delete_objects_without_access_throws_custom_error():
|
|
if not settings.TEST_DECORATOR_MODE:
|
|
raise SkipTest("Auth decorator does not work in server mode")
|
|
|
|
role_name = "some-test-role"
|
|
bucket_name = "some-test-bucket"
|
|
|
|
# Setup Bucket
|
|
client = boto3.client("s3", region_name="us-east-1")
|
|
client.create_bucket(Bucket=bucket_name)
|
|
client.put_object(Bucket=bucket_name, Key="some/prefix/test_file.txt")
|
|
|
|
trust_policy_document = {
|
|
"Version": "2012-10-17",
|
|
"Statement": {
|
|
"Effect": "Allow",
|
|
"Principal": {"AWS": f"arn:aws:iam::{ACCOUNT_ID}:root"},
|
|
"Action": "sts:AssumeRole",
|
|
},
|
|
}
|
|
|
|
# Setup User with the correct access
|
|
policy_document = {
|
|
"Version": "2012-10-17",
|
|
"Statement": [
|
|
{
|
|
"Effect": "Allow",
|
|
"Action": ["s3:ListBucket", "s3:GetBucketLocation"],
|
|
"Resource": f"arn:aws:s3:::{bucket_name}",
|
|
},
|
|
{
|
|
"Effect": "Allow",
|
|
"Action": ["s3:PutObject", "s3:GetObject"],
|
|
"Resource": f"arn:aws:s3:::{bucket_name}/*",
|
|
},
|
|
],
|
|
}
|
|
credentials = create_role_with_attached_policy_and_assume_it(
|
|
role_name, trust_policy_document, policy_document
|
|
)
|
|
|
|
session = boto3.session.Session(region_name="us-east-1")
|
|
s3_resource = session.resource(
|
|
"s3",
|
|
aws_access_key_id=credentials["AccessKeyId"],
|
|
aws_secret_access_key=credentials["SecretAccessKey"],
|
|
aws_session_token=credentials["SessionToken"],
|
|
)
|
|
bucket = s3_resource.Bucket(bucket_name)
|
|
|
|
# This action is not allowed.
|
|
# It should return a 200-response, with the body indicating that we
|
|
# do not have access.
|
|
response = bucket.objects.filter(Prefix="some/prefix").delete()[0]
|
|
assert len(response["Errors"]) == 1
|
|
|
|
error = response["Errors"][0]
|
|
assert error["Key"] == "some/prefix/test_file.txt"
|
|
assert error["Code"] == "AccessDenied"
|
|
assert error["Message"] == "Access Denied"
|