2022-02-24 20:07:54 +00:00
|
|
|
import boto3
|
|
|
|
import pytest
|
|
|
|
import sure # noqa # pylint: disable=unused-import
|
|
|
|
|
|
|
|
from botocore.client import ClientError
|
|
|
|
from moto import mock_s3control
|
|
|
|
|
|
|
|
|
|
|
|
@mock_s3control
|
|
|
|
def test_get_access_point_policy():
|
|
|
|
client = boto3.client("s3control", region_name="us-west-2")
|
|
|
|
client.create_access_point(
|
2022-03-10 14:39:59 +00:00
|
|
|
AccountId="111111111111", Name="ap_name", Bucket="mybucket"
|
2022-02-24 20:07:54 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
policy = """{
|
|
|
|
"Version": "2012-10-17",
|
|
|
|
"Statement": [
|
|
|
|
{
|
|
|
|
"Sid": "",
|
|
|
|
"Effect": "Allow",
|
|
|
|
"Action": "s3:GetObjectTagging",
|
|
|
|
"Resource": "arn:aws:s3:us-east-1:123456789012:accesspoint/mybucket/object/*",
|
|
|
|
"Principal": {
|
|
|
|
"AWS": "*"
|
|
|
|
}
|
|
|
|
}
|
|
|
|
]
|
|
|
|
}"""
|
|
|
|
client.put_access_point_policy(
|
|
|
|
AccountId="111111111111", Name="ap_name", Policy=policy
|
|
|
|
)
|
|
|
|
|
|
|
|
resp = client.get_access_point_policy(AccountId="111111111111", Name="ap_name")
|
|
|
|
resp.should.have.key("Policy").equals(policy)
|
|
|
|
|
|
|
|
|
|
|
|
@mock_s3control
|
|
|
|
def test_get_unknown_access_point_policy():
|
|
|
|
client = boto3.client("s3control", region_name="ap-southeast-1")
|
|
|
|
client.create_access_point(
|
2022-03-10 14:39:59 +00:00
|
|
|
AccountId="111111111111", Name="ap_name", Bucket="mybucket"
|
2022-02-24 20:07:54 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
with pytest.raises(ClientError) as exc:
|
|
|
|
client.get_access_point_policy(AccountId="111111111111", Name="ap_name")
|
|
|
|
err = exc.value.response["Error"]
|
|
|
|
err["Code"].should.equal("NoSuchAccessPointPolicy")
|
|
|
|
err["Message"].should.equal("The specified accesspoint policy does not exist")
|
|
|
|
err["AccessPointName"].should.equal("ap_name")
|
|
|
|
|
|
|
|
|
|
|
|
@mock_s3control
|
|
|
|
def test_get_access_point_policy_status():
|
|
|
|
client = boto3.client("s3control", region_name="us-west-2")
|
|
|
|
client.create_access_point(
|
2022-03-10 14:39:59 +00:00
|
|
|
AccountId="111111111111", Name="ap_name", Bucket="mybucket"
|
2022-02-24 20:07:54 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
policy = """{
|
|
|
|
"Version": "2012-10-17",
|
|
|
|
"Statement": [
|
|
|
|
{
|
|
|
|
"Sid": "",
|
|
|
|
"Effect": "Allow",
|
|
|
|
"Action": "s3:GetObjectTagging",
|
|
|
|
"Resource": "arn:aws:s3:us-east-1:123456789012:accesspoint/mybucket/object/*",
|
|
|
|
"Principal": {
|
|
|
|
"AWS": "*"
|
|
|
|
}
|
|
|
|
}
|
|
|
|
]
|
|
|
|
}"""
|
|
|
|
client.put_access_point_policy(
|
|
|
|
AccountId="111111111111", Name="ap_name", Policy=policy
|
|
|
|
)
|
|
|
|
|
|
|
|
resp = client.get_access_point_policy_status(
|
|
|
|
AccountId="111111111111", Name="ap_name"
|
|
|
|
)
|
|
|
|
resp.should.have.key("PolicyStatus").equals({"IsPublic": True})
|
|
|
|
|
|
|
|
|
|
|
|
@mock_s3control
|
|
|
|
def test_delete_access_point_policy():
|
|
|
|
client = boto3.client("s3control", region_name="us-west-2")
|
|
|
|
client.create_access_point(
|
2022-03-10 14:39:59 +00:00
|
|
|
AccountId="111111111111", Name="ap_name", Bucket="mybucket"
|
2022-02-24 20:07:54 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
policy = """some json policy"""
|
|
|
|
client.put_access_point_policy(
|
|
|
|
AccountId="111111111111", Name="ap_name", Policy=policy
|
|
|
|
)
|
|
|
|
|
|
|
|
client.delete_access_point_policy(AccountId="111111111111", Name="ap_name")
|
|
|
|
|
|
|
|
with pytest.raises(ClientError) as exc:
|
|
|
|
client.get_access_point_policy(AccountId="111111111111", Name="ap_name")
|
|
|
|
err = exc.value.response["Error"]
|
|
|
|
err["Code"].should.equal("NoSuchAccessPointPolicy")
|
|
|
|
|
|
|
|
|
|
|
|
@mock_s3control
|
|
|
|
def test_get_unknown_access_point_policy_status():
|
|
|
|
client = boto3.client("s3control", region_name="ap-southeast-1")
|
|
|
|
client.create_access_point(
|
2022-03-10 14:39:59 +00:00
|
|
|
AccountId="111111111111", Name="ap_name", Bucket="mybucket"
|
2022-02-24 20:07:54 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
with pytest.raises(ClientError) as exc:
|
|
|
|
client.get_access_point_policy_status(AccountId="111111111111", Name="ap_name")
|
|
|
|
err = exc.value.response["Error"]
|
|
|
|
err["Code"].should.equal("NoSuchAccessPointPolicy")
|
|
|
|
err["Message"].should.equal("The specified accesspoint policy does not exist")
|
|
|
|
err["AccessPointName"].should.equal("ap_name")
|