moto/moto/s3control/models.py
2024-01-27 19:38:09 +00:00

149 lines
5.2 KiB
Python

from collections import defaultdict
from datetime import datetime
from typing import Any, Dict, Optional
from moto.core.base_backend import BackendDict, BaseBackend
from moto.core.common_models import BaseModel
from moto.moto_api._internal import mock_random
from moto.s3.exceptions import (
InvalidPublicAccessBlockConfiguration,
NoSuchPublicAccessBlockConfiguration,
WrongPublicAccessBlockAccountIdError,
)
from moto.s3.models import PublicAccessBlock
from .exceptions import AccessPointNotFound, AccessPointPolicyNotFound
class AccessPoint(BaseModel):
def __init__(
self,
account_id: str,
name: str,
bucket: str,
vpc_configuration: Dict[str, Any],
public_access_block_configuration: Dict[str, Any],
):
self.name = name
self.alias = f"{name}-{mock_random.get_random_hex(34)}-s3alias"
self.bucket = bucket
self.created = datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f")
self.arn = f"arn:aws:s3:us-east-1:{account_id}:accesspoint/{name}"
self.policy: Optional[str] = None
self.network_origin = "VPC" if vpc_configuration else "Internet"
self.vpc_id = (vpc_configuration or {}).get("VpcId")
pubc = public_access_block_configuration or {}
self.pubc = {
"BlockPublicAcls": pubc.get("BlockPublicAcls", "true"),
"IgnorePublicAcls": pubc.get("IgnorePublicAcls", "true"),
"BlockPublicPolicy": pubc.get("BlockPublicPolicy", "true"),
"RestrictPublicBuckets": pubc.get("RestrictPublicBuckets", "true"),
}
def delete_policy(self) -> None:
self.policy = None
def set_policy(self, policy: str) -> None:
self.policy = policy
def has_policy(self) -> bool:
return self.policy is not None
class S3ControlBackend(BaseBackend):
def __init__(self, region_name: str, account_id: str):
super().__init__(region_name, account_id)
self.public_access_block: Optional[PublicAccessBlock] = None
self.access_points: Dict[str, Dict[str, AccessPoint]] = defaultdict(dict)
def get_public_access_block(self, account_id: str) -> PublicAccessBlock:
# The account ID should equal the account id that is set for Moto:
if account_id != self.account_id:
raise WrongPublicAccessBlockAccountIdError()
if not self.public_access_block:
raise NoSuchPublicAccessBlockConfiguration()
return self.public_access_block
def delete_public_access_block(self, account_id: str) -> None:
# The account ID should equal the account id that is set for Moto:
if account_id != self.account_id:
raise WrongPublicAccessBlockAccountIdError()
self.public_access_block = None
def put_public_access_block(
self, account_id: str, pub_block_config: Dict[str, Any]
) -> None:
# The account ID should equal the account id that is set for Moto:
if account_id != self.account_id:
raise WrongPublicAccessBlockAccountIdError()
if not pub_block_config:
raise InvalidPublicAccessBlockConfiguration()
self.public_access_block = PublicAccessBlock(
pub_block_config.get("BlockPublicAcls"),
pub_block_config.get("IgnorePublicAcls"),
pub_block_config.get("BlockPublicPolicy"),
pub_block_config.get("RestrictPublicBuckets"),
)
def create_access_point(
self,
account_id: str,
name: str,
bucket: str,
vpc_configuration: Dict[str, Any],
public_access_block_configuration: Dict[str, Any],
) -> AccessPoint:
access_point = AccessPoint(
account_id,
name,
bucket,
vpc_configuration,
public_access_block_configuration,
)
self.access_points[account_id][name] = access_point
return access_point
def delete_access_point(self, account_id: str, name: str) -> None:
self.access_points[account_id].pop(name, None)
def get_access_point(self, account_id: str, name: str) -> AccessPoint:
if name not in self.access_points[account_id]:
raise AccessPointNotFound(name)
return self.access_points[account_id][name]
def create_access_point_policy(
self, account_id: str, name: str, policy: str
) -> None:
access_point = self.get_access_point(account_id, name)
access_point.set_policy(policy)
def get_access_point_policy(self, account_id: str, name: str) -> str:
access_point = self.get_access_point(account_id, name)
if access_point.has_policy():
return access_point.policy # type: ignore[return-value]
raise AccessPointPolicyNotFound(name)
def delete_access_point_policy(self, account_id: str, name: str) -> None:
access_point = self.get_access_point(account_id, name)
access_point.delete_policy()
def get_access_point_policy_status(self, account_id: str, name: str) -> bool:
"""
We assume the policy status is always public
"""
self.get_access_point_policy(account_id, name)
return True
s3control_backends = BackendDict(
S3ControlBackend,
"s3control",
use_boto3_regions=False,
additional_regions=["global"],
)