S3Control - Access Points

This commit is contained in:
Bert Blommers 2022-02-24 19:07:54 -01:00
parent 8b81481d3e
commit 6733947a8c
8 changed files with 569 additions and 23 deletions

View File

@ -0,0 +1,44 @@
"""Exceptions raised by the s3control service."""
from moto.core.exceptions import RESTError
ERROR_WITH_ACCESS_POINT_NAME = """{% extends 'wrapped_single_error' %}
{% block extra %}<AccessPointName>{{ name }}</AccessPointName>{% endblock %}
"""
ERROR_WITH_ACCESS_POINT_POLICY = """{% extends 'wrapped_single_error' %}
{% block extra %}<AccessPointName>{{ name }}</AccessPointName>{% endblock %}
"""
class S3ControlError(RESTError):
def __init__(self, *args, **kwargs):
kwargs.setdefault("template", "single_error")
super().__init__(*args, **kwargs)
class AccessPointNotFound(S3ControlError):
code = 404
def __init__(self, name, **kwargs):
kwargs.setdefault("template", "ap_not_found")
kwargs["name"] = name
self.templates["ap_not_found"] = ERROR_WITH_ACCESS_POINT_NAME
super().__init__(
"NoSuchAccessPoint", "The specified accesspoint does not exist", **kwargs
)
class AccessPointPolicyNotFound(S3ControlError):
code = 404
def __init__(self, name, **kwargs):
kwargs.setdefault("template", "apf_not_found")
kwargs["name"] = name
self.templates["apf_not_found"] = ERROR_WITH_ACCESS_POINT_POLICY
super().__init__(
"NoSuchAccessPointPolicy",
"The specified accesspoint policy does not exist",
**kwargs
)

View File

@ -1,4 +1,7 @@
from moto.core import ACCOUNT_ID, BaseBackend from collections import defaultdict
from datetime import datetime
from moto.core import ACCOUNT_ID, BaseBackend, BaseModel
from moto.core.utils import get_random_hex
from moto.s3.exceptions import ( from moto.s3.exceptions import (
WrongPublicAccessBlockAccountIdError, WrongPublicAccessBlockAccountIdError,
NoSuchPublicAccessBlockConfiguration, NoSuchPublicAccessBlockConfiguration,
@ -6,6 +9,38 @@ from moto.s3.exceptions import (
) )
from moto.s3.models import PublicAccessBlock from moto.s3.models import PublicAccessBlock
from .exceptions import AccessPointNotFound, AccessPointPolicyNotFound
class AccessPoint(BaseModel):
def __init__(
self, name, bucket, vpc_configuration, public_access_block_configuration
):
self.name = name
self.alias = f"{name}-{get_random_hex(34)}-s3alias"
self.bucket = bucket
self.created = datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f")
self.arn = f"arn:aws:s3:us-east-1:{ACCOUNT_ID}:accesspoint/{name}"
self.policy = None
self.network_origin = "VPC" if vpc_configuration else "Internet"
self.vpc_id = (vpc_configuration or {}).get("VpcId")
pubc = public_access_block_configuration or {}
self.pubc = {
"BlockPublicAcls": pubc.get("BlockPublicAcls", "true"),
"IgnorePublicAcls": pubc.get("IgnorePublicAcls", "true"),
"BlockPublicPolicy": pubc.get("BlockPublicPolicy", "true"),
"RestrictPublicBuckets": pubc.get("RestrictPublicBuckets", "true"),
}
def delete_policy(self):
self.policy = None
def set_policy(self, policy):
self.policy = policy
def has_policy(self):
return self.policy is not None
class S3ControlBackend(BaseBackend): class S3ControlBackend(BaseBackend):
""" """
@ -19,6 +54,7 @@ class S3ControlBackend(BaseBackend):
def __init__(self, region_name=None): def __init__(self, region_name=None):
self.region_name = region_name self.region_name = region_name
self.public_access_block = None self.public_access_block = None
self.access_points = defaultdict(dict)
def reset(self): def reset(self):
region_name = self.region_name region_name = self.region_name
@ -57,5 +93,48 @@ class S3ControlBackend(BaseBackend):
pub_block_config.get("RestrictPublicBuckets"), pub_block_config.get("RestrictPublicBuckets"),
) )
def create_access_point(
self,
account_id,
name,
bucket,
vpc_configuration,
public_access_block_configuration,
):
access_point = AccessPoint(
name, bucket, vpc_configuration, public_access_block_configuration
)
self.access_points[account_id][name] = access_point
return access_point
def delete_access_point(self, account_id, name):
self.access_points[account_id].pop(name, None)
def get_access_point(self, account_id, name):
if name not in self.access_points[account_id]:
raise AccessPointNotFound(name)
return self.access_points[account_id][name]
def create_access_point_policy(self, account_id, name, policy):
access_point = self.get_access_point(account_id, name)
access_point.set_policy(policy)
def get_access_point_policy(self, account_id, name):
access_point = self.get_access_point(account_id, name)
if access_point.has_policy():
return access_point.policy
raise AccessPointPolicyNotFound(name)
def delete_access_point_policy(self, account_id, name):
access_point = self.get_access_point(account_id, name)
access_point.delete_policy()
def get_access_point_policy_status(self, account_id, name):
"""
We assume the policy status is always public
"""
self.get_access_point_policy(account_id, name)
return True
s3control_backend = S3ControlBackend() s3control_backend = S3ControlBackend()

View File

@ -1,30 +1,38 @@
import json import json
import xmltodict import xmltodict
from functools import wraps
from moto.core.responses import BaseResponse from moto.core.responses import BaseResponse
from moto.core.utils import amzn_request_id from moto.core.utils import amzn_request_id
from moto.s3.exceptions import S3ClientError from moto.s3.exceptions import S3ClientError
from moto.s3.responses import S3_PUBLIC_ACCESS_BLOCK_CONFIGURATION from moto.s3.responses import S3_PUBLIC_ACCESS_BLOCK_CONFIGURATION
from .exceptions import S3ControlError
from .models import s3control_backend from .models import s3control_backend
class S3ControlResponse(BaseResponse): def error_handler(f):
@classmethod @wraps(f)
def public_access_block(cls, request, full_url, headers): def _wrapper(*args, **kwargs):
response_instance = S3ControlResponse()
try: try:
return response_instance._public_access_block(request) return f(*args, **kwargs)
except S3ClientError as err: except S3ControlError as e:
return err.code, {}, err.description return e.code, e.get_headers(), e.get_body()
return _wrapper
class S3ControlResponse(BaseResponse):
@amzn_request_id @amzn_request_id
def _public_access_block(self, request): def public_access_block(self, request, full_url, headers):
try:
if request.method == "GET": if request.method == "GET":
return self.get_public_access_block(request) return self.get_public_access_block(request)
elif request.method == "PUT": elif request.method == "PUT":
return self.put_public_access_block(request) return self.put_public_access_block(request)
elif request.method == "DELETE": elif request.method == "DELETE":
return self.delete_public_access_block(request) return self.delete_public_access_block(request)
except S3ClientError as err:
return err.code, {}, err.description
def get_public_access_block(self, request): def get_public_access_block(self, request):
account_id = request.headers.get("x-amz-account-id") account_id = request.headers.get("x-amz-account-id")
@ -53,3 +61,181 @@ class S3ControlResponse(BaseResponse):
parsed_xml["PublicAccessBlockConfiguration"].pop("@xmlns", None) parsed_xml["PublicAccessBlockConfiguration"].pop("@xmlns", None)
return parsed_xml return parsed_xml
@error_handler
def access_point(self, request, full_url, headers):
self.setup_class(request, full_url, headers)
if request.method == "PUT":
return self.create_access_point(full_url)
if request.method == "GET":
return self.get_access_point(full_url)
if request.method == "DELETE":
return self.delete_access_point(full_url)
@error_handler
def access_point_policy(self, request, full_url, headers):
self.setup_class(request, full_url, headers)
if request.method == "PUT":
return self.create_access_point_policy(full_url)
if request.method == "GET":
return self.get_access_point_policy(full_url)
if request.method == "DELETE":
return self.delete_access_point_policy(full_url)
@error_handler
def access_point_policy_status(self, request, full_url, headers):
self.setup_class(request, full_url, headers)
if request.method == "PUT":
return self.create_access_point(full_url)
if request.method == "GET":
return self.get_access_point_policy_status(full_url)
def create_access_point(self, full_url):
account_id, name = self._get_accountid_and_name_from_accesspoint(full_url)
params = xmltodict.parse(self.body)["CreateAccessPointRequest"]
bucket = params["Bucket"]
vpc_configuration = params.get("VpcConfiguration")
public_access_block_configuration = params.get("PublicAccessBlockConfiguration")
access_point = s3control_backend.create_access_point(
account_id=account_id,
name=name,
bucket=bucket,
vpc_configuration=vpc_configuration,
public_access_block_configuration=public_access_block_configuration,
)
template = self.response_template(CREATE_ACCESS_POINT_TEMPLATE)
return 200, {}, template.render(access_point=access_point)
def get_access_point(self, full_url):
account_id, name = self._get_accountid_and_name_from_accesspoint(full_url)
access_point = s3control_backend.get_access_point(
account_id=account_id, name=name,
)
template = self.response_template(GET_ACCESS_POINT_TEMPLATE)
return 200, {}, template.render(access_point=access_point)
def delete_access_point(self, full_url):
account_id, name = self._get_accountid_and_name_from_accesspoint(full_url)
s3control_backend.delete_access_point(
account_id=account_id, name=name,
)
return 204, {}, ""
def create_access_point_policy(self, full_url):
account_id, name = self._get_accountid_and_name_from_policy(full_url)
params = xmltodict.parse(self.body)
policy = params["PutAccessPointPolicyRequest"]["Policy"]
s3control_backend.create_access_point_policy(account_id, name, policy)
return 200, {}, ""
def get_access_point_policy(self, full_url):
account_id, name = self._get_accountid_and_name_from_policy(full_url)
policy = s3control_backend.get_access_point_policy(account_id, name)
template = self.response_template(GET_ACCESS_POINT_POLICY_TEMPLATE)
return 200, {}, template.render(policy=policy)
def delete_access_point_policy(self, full_url):
account_id, name = self._get_accountid_and_name_from_policy(full_url)
s3control_backend.delete_access_point_policy(
account_id=account_id, name=name,
)
return 204, {}, ""
def get_access_point_policy_status(self, full_url):
account_id, name = self._get_accountid_and_name_from_policy(full_url)
s3control_backend.get_access_point_policy_status(account_id, name)
template = self.response_template(GET_ACCESS_POINT_POLICY_STATUS_TEMPLATE)
return 200, {}, template.render()
def _get_accountid_and_name_from_accesspoint(self, full_url):
url = full_url
if full_url.startswith("http"):
url = full_url.split("://")[1]
account_id = url.split(".")[0]
name = url.split("v20180820/accesspoint/")[-1]
return account_id, name
def _get_accountid_and_name_from_policy(self, full_url):
url = full_url
if full_url.startswith("http"):
url = full_url.split("://")[1]
account_id = url.split(".")[0]
name = self.path.split("/")[-2]
return account_id, name
S3ControlResponseInstance = S3ControlResponse()
CREATE_ACCESS_POINT_TEMPLATE = """<CreateAccessPointResult>
<ResponseMetadata>
<RequestId>1549581b-12b7-11e3-895e-1334aEXAMPLE</RequestId>
</ResponseMetadata>
<Alias>{{ access_point.name }}</Alias>
<AccessPointArn>{{ access_point.arn }}</AccessPointArn>
</CreateAccessPointResult>
"""
GET_ACCESS_POINT_TEMPLATE = """<GetAccessPointResult>
<ResponseMetadata>
<RequestId>1549581b-12b7-11e3-895e-1334aEXAMPLE</RequestId>
</ResponseMetadata>
<Name>{{ access_point.name }}</Name>
<Bucket>{{ access_point.bucket }}</Bucket>
<NetworkOrigin>{{ access_point.network_origin }}</NetworkOrigin>
{% if access_point.vpc_id %}
<VpcConfiguration>
<VpcId>{{ access_point.vpc_id }}</VpcId>
</VpcConfiguration>
{% endif %}
<PublicAccessBlockConfiguration>
<BlockPublicAcls>{{ access_point.pubc["BlockPublicAcls"] }}</BlockPublicAcls>
<IgnorePublicAcls>{{ access_point.pubc["IgnorePublicAcls"] }}</IgnorePublicAcls>
<BlockPublicPolicy>{{ access_point.pubc["BlockPublicPolicy"] }}</BlockPublicPolicy>
<RestrictPublicBuckets>{{ access_point.pubc["RestrictPublicBuckets"] }}</RestrictPublicBuckets>
</PublicAccessBlockConfiguration>
<CreationDate>{{ access_point.created }}</CreationDate>
<Alias>{{ access_point.alias }}</Alias>
<AccessPointArn>{{ access_point.arn }}</AccessPointArn>
<Endpoints>
<entry>
<key>ipv4</key>
<value>s3-accesspoint.us-east-1.amazonaws.com</value>
</entry>
<entry>
<key>fips</key>
<value>s3-accesspoint-fips.us-east-1.amazonaws.com</value>
</entry>
<entry>
<key>fips_dualstack</key>
<value>s3-accesspoint-fips.dualstack.us-east-1.amazonaws.com</value>
</entry>
<entry>
<key>dualstack</key>
<value>s3-accesspoint.dualstack.us-east-1.amazonaws.com</value>
</entry>
</Endpoints>
</GetAccessPointResult>
"""
GET_ACCESS_POINT_POLICY_TEMPLATE = """<GetAccessPointPolicyResult>
<ResponseMetadata>
<RequestId>1549581b-12b7-11e3-895e-1334aEXAMPLE</RequestId>
</ResponseMetadata>
<Policy>{{ policy }}</Policy>
</GetAccessPointPolicyResult>
"""
GET_ACCESS_POINT_POLICY_STATUS_TEMPLATE = """<GetAccessPointPolicyResult>
<ResponseMetadata>
<RequestId>1549581b-12b7-11e3-895e-1334aEXAMPLE</RequestId>
</ResponseMetadata>
<PolicyStatus>
<IsPublic>true</IsPublic>
</PolicyStatus>
</GetAccessPointPolicyResult>
"""

View File

@ -1,5 +1,5 @@
"""s3control base URL and path.""" """s3control base URL and path."""
from .responses import S3ControlResponse from .responses import S3ControlResponseInstance
url_bases = [ url_bases = [
r"https?://([0-9]+)\.s3-control\.(.+)\.amazonaws\.com", r"https?://([0-9]+)\.s3-control\.(.+)\.amazonaws\.com",
@ -7,5 +7,8 @@ url_bases = [
url_paths = { url_paths = {
"{0}/v20180820/configuration/publicAccessBlock$": S3ControlResponse.public_access_block, "{0}/v20180820/configuration/publicAccessBlock$": S3ControlResponseInstance.public_access_block,
"{0}/v20180820/accesspoint/(?P<name>[\w_:%-]+)$": S3ControlResponseInstance.access_point,
"{0}/v20180820/accesspoint/(?P<name>[\w_:%-]+)/policy$": S3ControlResponseInstance.access_point_policy,
"{0}/v20180820/accesspoint/(?P<name>[\w_:%-]+)/policyStatus$": S3ControlResponseInstance.access_point_policy_status,
} }

View File

@ -9,8 +9,4 @@ TestAccDataSourceAwsNetworkInterface_CarrierIPAssociation
TestAccAWSRouteTable_IPv4_To_LocalGateway TestAccAWSRouteTable_IPv4_To_LocalGateway
TestAccAWSRouteTable_IPv4_To_VpcEndpoint TestAccAWSRouteTable_IPv4_To_VpcEndpoint
TestAccAWSRouteTable_VpcClassicLink TestAccAWSRouteTable_VpcClassicLink
TestAccAWSS3BucketObject_NonVersioned
TestAccAWSS3BucketObject_ignoreTags
TestAccAWSS3BucketObject_updatesWithVersioningViaAccessPoint TestAccAWSS3BucketObject_updatesWithVersioningViaAccessPoint
TestAccAWSS3BucketObject_updates
TestAccAWSS3BucketObject_updatesWithVersioning

View File

@ -120,7 +120,10 @@ TestAccAWSENI_Tags
TestAccAWSENI_basic TestAccAWSENI_basic
TestAccAWSENI_IPv6 TestAccAWSENI_IPv6
TestAccAWSENI_disappears TestAccAWSENI_disappears
TestAccAWSS3BucketObject_ TestAccAWSS3BucketObject
TestAccAWSS3BucketPolicy
TestAccAWSS3AccessPoint
TestAccAWSS3BucketPublicAccessBlock
TestAccAWSS3ObjectCopy TestAccAWSS3ObjectCopy
TestAccAWSIAMPolicy_ TestAccAWSIAMPolicy_
TestAccAWSIAMGroup_ TestAccAWSIAMGroup_

View File

@ -0,0 +1,119 @@
import boto3
import pytest
import sure # noqa # pylint: disable=unused-import
from botocore.client import ClientError
from moto import mock_s3control
from moto.core import ACCOUNT_ID
@mock_s3control
def test_create_access_point():
client = boto3.client("s3control", region_name="eu-west-1")
resp = client.create_access_point(
AccountId="111111111111", Name="ap_name", Bucket="mybucket",
)
resp.should.have.key("AccessPointArn")
resp.should.have.key("Alias").equals("ap_name")
@mock_s3control
def test_get_unknown_access_point():
client = boto3.client("s3control", region_name="ap-southeast-1")
with pytest.raises(ClientError) as exc:
client.get_access_point(AccountId="111111111111", Name="ap_name")
err = exc.value.response["Error"]
err["Code"].should.equal("NoSuchAccessPoint")
err["Message"].should.equal("The specified accesspoint does not exist")
err["AccessPointName"].should.equal("ap_name")
@mock_s3control
def test_get_access_point_minimal():
client = boto3.client("s3control", region_name="ap-southeast-1")
client.create_access_point(
AccountId="111111111111", Name="ap_name", Bucket="mybucket",
)
resp = client.get_access_point(AccountId="111111111111", Name="ap_name")
resp.should.have.key("Name").equals("ap_name")
resp.should.have.key("Bucket").equals("mybucket")
resp.should.have.key("NetworkOrigin").equals("Internet")
resp.should.have.key("PublicAccessBlockConfiguration").equals(
{
"BlockPublicAcls": True,
"IgnorePublicAcls": True,
"BlockPublicPolicy": True,
"RestrictPublicBuckets": True,
}
)
resp.should.have.key("CreationDate")
resp.should.have.key("Alias").match("ap_name-[a-z0-9]+-s3alias")
resp.should.have.key("AccessPointArn").equals(
f"arn:aws:s3:us-east-1:{ACCOUNT_ID}:accesspoint/ap_name"
)
resp.should.have.key("Endpoints")
resp["Endpoints"].should.have.key("ipv4").equals(
"s3-accesspoint.us-east-1.amazonaws.com"
)
resp["Endpoints"].should.have.key("fips").equals(
"s3-accesspoint-fips.us-east-1.amazonaws.com"
)
resp["Endpoints"].should.have.key("fips_dualstack").equals(
"s3-accesspoint-fips.dualstack.us-east-1.amazonaws.com"
)
resp["Endpoints"].should.have.key("dualstack").equals(
"s3-accesspoint.dualstack.us-east-1.amazonaws.com"
)
@mock_s3control
def test_get_access_point_full():
client = boto3.client("s3control", region_name="ap-southeast-1")
client.create_access_point(
AccountId="111111111111",
Name="ap_name",
Bucket="mybucket",
VpcConfiguration={"VpcId": "sth"},
PublicAccessBlockConfiguration={
"BlockPublicAcls": False,
"IgnorePublicAcls": False,
"BlockPublicPolicy": False,
"RestrictPublicBuckets": False,
},
)
resp = client.get_access_point(AccountId="111111111111", Name="ap_name")
resp.should.have.key("Name").equals("ap_name")
resp.should.have.key("Bucket").equals("mybucket")
resp.should.have.key("NetworkOrigin").equals("VPC")
resp.should.have.key("VpcConfiguration").equals({"VpcId": "sth"})
resp.should.have.key("PublicAccessBlockConfiguration").equals(
{
"BlockPublicAcls": False,
"IgnorePublicAcls": False,
"BlockPublicPolicy": False,
"RestrictPublicBuckets": False,
}
)
@mock_s3control
def test_delete_access_point():
client = boto3.client("s3control", region_name="ap-southeast-1")
client.create_access_point(
AccountId="111111111111", Name="ap_name", Bucket="mybucket",
)
client.delete_access_point(AccountId="111111111111", Name="ap_name")
with pytest.raises(ClientError) as exc:
client.get_access_point(AccountId="111111111111", Name="ap_name")
err = exc.value.response["Error"]
err["Code"].should.equal("NoSuchAccessPoint")

View File

@ -0,0 +1,116 @@
import boto3
import pytest
import sure # noqa # pylint: disable=unused-import
from botocore.client import ClientError
from moto import mock_s3control
@mock_s3control
def test_get_access_point_policy():
client = boto3.client("s3control", region_name="us-west-2")
client.create_access_point(
AccountId="111111111111", Name="ap_name", Bucket="mybucket",
)
policy = """{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Action": "s3:GetObjectTagging",
"Resource": "arn:aws:s3:us-east-1:123456789012:accesspoint/mybucket/object/*",
"Principal": {
"AWS": "*"
}
}
]
}"""
client.put_access_point_policy(
AccountId="111111111111", Name="ap_name", Policy=policy
)
resp = client.get_access_point_policy(AccountId="111111111111", Name="ap_name")
resp.should.have.key("Policy").equals(policy)
@mock_s3control
def test_get_unknown_access_point_policy():
client = boto3.client("s3control", region_name="ap-southeast-1")
client.create_access_point(
AccountId="111111111111", Name="ap_name", Bucket="mybucket",
)
with pytest.raises(ClientError) as exc:
client.get_access_point_policy(AccountId="111111111111", Name="ap_name")
err = exc.value.response["Error"]
err["Code"].should.equal("NoSuchAccessPointPolicy")
err["Message"].should.equal("The specified accesspoint policy does not exist")
err["AccessPointName"].should.equal("ap_name")
@mock_s3control
def test_get_access_point_policy_status():
client = boto3.client("s3control", region_name="us-west-2")
client.create_access_point(
AccountId="111111111111", Name="ap_name", Bucket="mybucket",
)
policy = """{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Action": "s3:GetObjectTagging",
"Resource": "arn:aws:s3:us-east-1:123456789012:accesspoint/mybucket/object/*",
"Principal": {
"AWS": "*"
}
}
]
}"""
client.put_access_point_policy(
AccountId="111111111111", Name="ap_name", Policy=policy
)
resp = client.get_access_point_policy_status(
AccountId="111111111111", Name="ap_name"
)
resp.should.have.key("PolicyStatus").equals({"IsPublic": True})
@mock_s3control
def test_delete_access_point_policy():
client = boto3.client("s3control", region_name="us-west-2")
client.create_access_point(
AccountId="111111111111", Name="ap_name", Bucket="mybucket",
)
policy = """some json policy"""
client.put_access_point_policy(
AccountId="111111111111", Name="ap_name", Policy=policy
)
client.delete_access_point_policy(AccountId="111111111111", Name="ap_name")
with pytest.raises(ClientError) as exc:
client.get_access_point_policy(AccountId="111111111111", Name="ap_name")
err = exc.value.response["Error"]
err["Code"].should.equal("NoSuchAccessPointPolicy")
@mock_s3control
def test_get_unknown_access_point_policy_status():
client = boto3.client("s3control", region_name="ap-southeast-1")
client.create_access_point(
AccountId="111111111111", Name="ap_name", Bucket="mybucket",
)
with pytest.raises(ClientError) as exc:
client.get_access_point_policy_status(AccountId="111111111111", Name="ap_name")
err = exc.value.response["Error"]
err["Code"].should.equal("NoSuchAccessPointPolicy")
err["Message"].should.equal("The specified accesspoint policy does not exist")
err["AccessPointName"].should.equal("ap_name")