Separate S3Control methods from S3 (#4745)

This commit is contained in:
Bert Blommers 2022-01-18 19:10:22 -01:00 committed by GitHub
parent 9c8744ff64
commit 6610862a8f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 953 additions and 659 deletions

View File

@ -4318,6 +4318,69 @@
- [ ] write_get_object_response
</details>
## s3control
<details>
<summary>5% implemented</summary>
- [ ] create_access_point
- [ ] create_access_point_for_object_lambda
- [ ] create_bucket
- [ ] create_job
- [ ] create_multi_region_access_point
- [ ] delete_access_point
- [ ] delete_access_point_for_object_lambda
- [ ] delete_access_point_policy
- [ ] delete_access_point_policy_for_object_lambda
- [ ] delete_bucket
- [ ] delete_bucket_lifecycle_configuration
- [ ] delete_bucket_policy
- [ ] delete_bucket_tagging
- [ ] delete_job_tagging
- [ ] delete_multi_region_access_point
- [X] delete_public_access_block
- [ ] delete_storage_lens_configuration
- [ ] delete_storage_lens_configuration_tagging
- [ ] describe_job
- [ ] describe_multi_region_access_point_operation
- [ ] get_access_point
- [ ] get_access_point_configuration_for_object_lambda
- [ ] get_access_point_for_object_lambda
- [ ] get_access_point_policy
- [ ] get_access_point_policy_for_object_lambda
- [ ] get_access_point_policy_status
- [ ] get_access_point_policy_status_for_object_lambda
- [ ] get_bucket
- [ ] get_bucket_lifecycle_configuration
- [ ] get_bucket_policy
- [ ] get_bucket_tagging
- [ ] get_job_tagging
- [ ] get_multi_region_access_point
- [ ] get_multi_region_access_point_policy
- [ ] get_multi_region_access_point_policy_status
- [X] get_public_access_block
- [ ] get_storage_lens_configuration
- [ ] get_storage_lens_configuration_tagging
- [ ] list_access_points
- [ ] list_access_points_for_object_lambda
- [ ] list_jobs
- [ ] list_multi_region_access_points
- [ ] list_regional_buckets
- [ ] list_storage_lens_configurations
- [ ] put_access_point_configuration_for_object_lambda
- [ ] put_access_point_policy
- [ ] put_access_point_policy_for_object_lambda
- [ ] put_bucket_lifecycle_configuration
- [ ] put_bucket_policy
- [ ] put_bucket_tagging
- [ ] put_job_tagging
- [ ] put_multi_region_access_point_policy
- [X] put_public_access_block
- [ ] put_storage_lens_configuration
- [ ] put_storage_lens_configuration_tagging
- [ ] update_job_priority
- [ ] update_job_status
</details>
## sagemaker
<details>
<summary>15% implemented</summary>
@ -5343,7 +5406,6 @@
- route53-recovery-readiness
- route53domains
- rum
- s3control
- s3outposts
- sagemaker-a2i-runtime
- sagemaker-edge

View File

@ -0,0 +1,87 @@
.. _implementedservice_s3control:
.. |start-h3| raw:: html
<h3>
.. |end-h3| raw:: html
</h3>
=========
s3control
=========
.. autoclass:: moto.s3control.models.S3ControlBackend
|start-h3| Example usage |end-h3|
.. sourcecode:: python
@mock_s3control
def test_s3control_behaviour:
boto3.client("s3control")
...
|start-h3| Implemented features for this service |end-h3|
- [ ] create_access_point
- [ ] create_access_point_for_object_lambda
- [ ] create_bucket
- [ ] create_job
- [ ] create_multi_region_access_point
- [ ] delete_access_point
- [ ] delete_access_point_for_object_lambda
- [ ] delete_access_point_policy
- [ ] delete_access_point_policy_for_object_lambda
- [ ] delete_bucket
- [ ] delete_bucket_lifecycle_configuration
- [ ] delete_bucket_policy
- [ ] delete_bucket_tagging
- [ ] delete_job_tagging
- [ ] delete_multi_region_access_point
- [X] delete_public_access_block
- [ ] delete_storage_lens_configuration
- [ ] delete_storage_lens_configuration_tagging
- [ ] describe_job
- [ ] describe_multi_region_access_point_operation
- [ ] get_access_point
- [ ] get_access_point_configuration_for_object_lambda
- [ ] get_access_point_for_object_lambda
- [ ] get_access_point_policy
- [ ] get_access_point_policy_for_object_lambda
- [ ] get_access_point_policy_status
- [ ] get_access_point_policy_status_for_object_lambda
- [ ] get_bucket
- [ ] get_bucket_lifecycle_configuration
- [ ] get_bucket_policy
- [ ] get_bucket_tagging
- [ ] get_job_tagging
- [ ] get_multi_region_access_point
- [ ] get_multi_region_access_point_policy
- [ ] get_multi_region_access_point_policy_status
- [X] get_public_access_block
- [ ] get_storage_lens_configuration
- [ ] get_storage_lens_configuration_tagging
- [ ] list_access_points
- [ ] list_access_points_for_object_lambda
- [ ] list_jobs
- [ ] list_multi_region_access_points
- [ ] list_regional_buckets
- [ ] list_storage_lens_configurations
- [ ] put_access_point_configuration_for_object_lambda
- [ ] put_access_point_policy
- [ ] put_access_point_policy_for_object_lambda
- [ ] put_bucket_lifecycle_configuration
- [ ] put_bucket_policy
- [ ] put_bucket_tagging
- [ ] put_job_tagging
- [ ] put_multi_region_access_point_policy
- [X] put_public_access_block
- [ ] put_storage_lens_configuration
- [ ] put_storage_lens_configuration_tagging
- [ ] update_job_priority
- [ ] update_job_status

View File

@ -102,6 +102,7 @@ mock_route53resolver = lazy_load(
".route53resolver", "mock_route53resolver", boto3_name="route53resolver"
)
mock_s3 = lazy_load(".s3", "mock_s3")
mock_s3control = lazy_load(".s3control", "mock_s3control")
mock_sagemaker = lazy_load(".sagemaker", "mock_sagemaker")
mock_secretsmanager = lazy_load(".secretsmanager", "mock_secretsmanager")
mock_ses = lazy_load(".ses", "mock_ses")

View File

@ -108,13 +108,17 @@ backend_url_patterns = [
"route53resolver",
re.compile("https?://route53resolver\\.(.+)\\.amazonaws\\.com"),
),
("s3", re.compile("https?://s3(.*)\\.amazonaws.com")),
("s3", re.compile("https?://s3(?!-control)(.*)\\.amazonaws.com")),
(
"s3",
re.compile(
"https?://(?P<bucket_name>[a-zA-Z0-9\\-_.]*)\\.?s3(.*)\\.amazonaws.com"
"https?://(?P<bucket_name>[a-zA-Z0-9\\-_.]*)\\.?s3(?!-control)(.*)\\.amazonaws.com"
),
),
(
"s3control",
re.compile("https?://([0-9]+)\\.s3-control\\.(.+)\\.amazonaws\\.com"),
),
("sagemaker", re.compile("https?://api.sagemaker\\.(.+)\\.amazonaws.com")),
("sdb", re.compile("https?://sdb\\.(.+)\\.amazonaws\\.com")),
("secretsmanager", re.compile("https?://secretsmanager\\.(.+)\\.amazonaws\\.com")),

View File

@ -54,7 +54,8 @@ from moto.core import ACCOUNT_ID as DEFAULT_ACCOUNT_ID
from moto.core.responses import AWSServiceSpec
from moto.core.utils import BackendDict
from moto.iam.config import role_config_query, policy_config_query
from moto.s3.config import s3_account_public_access_block_query, s3_config_query
from moto.s3.config import s3_config_query
from moto.s3control.config import s3_account_public_access_block_query
from moto.utilities.utils import load_resource

View File

@ -1,13 +1,8 @@
import datetime
import json
import time
from boto3 import Session
from moto.core.exceptions import InvalidNextTokenException
from moto.core.models import ConfigQueryModel
from moto.s3 import s3_backends
from moto.s3.models import get_moto_s3_account_id
class S3ConfigQuery(ConfigQueryModel):
@ -124,147 +119,4 @@ class S3ConfigQuery(ConfigQueryModel):
return config_data
class S3AccountPublicAccessBlockConfigQuery(ConfigQueryModel):
def list_config_service_resources(
self,
resource_ids,
resource_name,
limit,
next_token,
backend_region=None,
resource_region=None,
aggregator=None,
):
# For the Account Public Access Block, they are the same for all regions. The resource ID is the AWS account ID
# There is no resource name -- it should be a blank string "" if provided.
# The resource name can only ever be None or an empty string:
if resource_name is not None and resource_name != "":
return [], None
pab = None
account_id = get_moto_s3_account_id()
regions = [region for region in Session().get_available_regions("config")]
# If a resource ID was passed in, then filter accordingly:
if resource_ids:
for resource_id in resource_ids:
if account_id == resource_id:
pab = self.backends["global"].account_public_access_block
break
# Otherwise, just grab the one from the backend:
if not resource_ids:
pab = self.backends["global"].account_public_access_block
# If it's not present, then return nothing
if not pab:
return [], None
# Filter on regions (and paginate on them as well):
if backend_region:
pab_list = [backend_region]
elif resource_region:
# Invalid region?
if resource_region not in regions:
return [], None
pab_list = [resource_region]
# Aggregated query where no regions were supplied so return them all:
else:
pab_list = regions
# Pagination logic:
sorted_regions = sorted(pab_list)
new_token = None
# Get the start:
if not next_token:
start = 0
else:
# Tokens for this moto feature is just the region-name:
# For OTHER non-global resource types, it's the region concatenated with the resource ID.
if next_token not in sorted_regions:
raise InvalidNextTokenException()
start = sorted_regions.index(next_token)
# Get the list of items to collect:
pab_list = sorted_regions[start : (start + limit)]
if len(sorted_regions) > (start + limit):
new_token = sorted_regions[start + limit]
return (
[
{
"type": "AWS::S3::AccountPublicAccessBlock",
"id": account_id,
"region": region,
}
for region in pab_list
],
new_token,
)
def get_config_resource(
self, resource_id, resource_name=None, backend_region=None, resource_region=None
):
# Do we even have this defined?
if not self.backends["global"].account_public_access_block:
return None
# Resource name can only ever be "" if it's supplied:
if resource_name is not None and resource_name != "":
return None
# Are we filtering based on region?
account_id = get_moto_s3_account_id()
regions = [region for region in Session().get_available_regions("config")]
# Is the resource ID correct?:
if account_id == resource_id:
if backend_region:
pab_region = backend_region
# Invalid region?
elif resource_region not in regions:
return None
else:
pab_region = resource_region
else:
return None
# Format the PAB to the AWS Config format:
creation_time = datetime.datetime.utcnow()
config_data = {
"version": "1.3",
"accountId": account_id,
"configurationItemCaptureTime": str(creation_time),
"configurationItemStatus": "OK",
"configurationStateId": str(
int(time.mktime(creation_time.timetuple()))
), # PY2 and 3 compatible
"resourceType": "AWS::S3::AccountPublicAccessBlock",
"resourceId": account_id,
"awsRegion": pab_region,
"availabilityZone": "Not Applicable",
"configuration": self.backends[
"global"
].account_public_access_block.to_config_dict(),
"supplementaryConfiguration": {},
}
# The 'configuration' field is also a JSON string:
config_data["configuration"] = json.dumps(config_data["configuration"])
return config_data
s3_config_query = S3ConfigQuery(s3_backends)
s3_account_public_access_block_query = S3AccountPublicAccessBlockConfigQuery(
s3_backends
)

View File

@ -50,7 +50,6 @@ from moto.s3.exceptions import (
CrossLocationLoggingProhibitted,
NoSuchPublicAccessBlockConfiguration,
InvalidPublicAccessBlockConfiguration,
WrongPublicAccessBlockAccountIdError,
NoSuchUpload,
ObjectLockConfigurationNotFoundError,
InvalidTagError,
@ -1330,7 +1329,6 @@ class S3Backend(BaseBackend, CloudWatchMetricProvider):
def __init__(self):
self.buckets = {}
self.account_public_access_block = None
self.tagger = TaggingService()
@property
@ -1572,16 +1570,6 @@ class S3Backend(BaseBackend, CloudWatchMetricProvider):
return bucket.public_access_block
def get_account_public_access_block(self, account_id):
# The account ID should equal the account id that is set for Moto:
if account_id != ACCOUNT_ID:
raise WrongPublicAccessBlockAccountIdError()
if not self.account_public_access_block:
raise NoSuchPublicAccessBlockConfiguration()
return self.account_public_access_block
def put_object(
self,
bucket_name,
@ -1768,13 +1756,6 @@ class S3Backend(BaseBackend, CloudWatchMetricProvider):
bucket = self.get_bucket(bucket_name)
bucket.public_access_block = None
def delete_account_public_access_block(self, account_id):
# The account ID should equal the account id that is set for Moto:
if account_id != ACCOUNT_ID:
raise WrongPublicAccessBlockAccountIdError()
self.account_public_access_block = None
def put_bucket_notification_configuration(self, bucket_name, notification_config):
bucket = self.get_bucket(bucket_name)
bucket.set_notification_configuration(notification_config)
@ -1803,21 +1784,6 @@ class S3Backend(BaseBackend, CloudWatchMetricProvider):
pub_block_config.get("RestrictPublicBuckets"),
)
def put_account_public_access_block(self, account_id, pub_block_config):
# The account ID should equal the account id that is set for Moto:
if account_id != ACCOUNT_ID:
raise WrongPublicAccessBlockAccountIdError()
if not pub_block_config:
raise InvalidPublicAccessBlockConfiguration()
self.account_public_access_block = PublicAccessBlock(
pub_block_config.get("BlockPublicAcls"),
pub_block_config.get("IgnorePublicAcls"),
pub_block_config.get("BlockPublicPolicy"),
pub_block_config.get("RestrictPublicBuckets"),
)
def initiate_multipart(self, bucket_name, key_name, metadata):
bucket = self.get_bucket(bucket_name)
new_multipart = FakeMultipart(key_name, metadata)

View File

@ -3,8 +3,6 @@ import os
import re
from typing import List, Union
from botocore.awsrequest import AWSPreparedRequest
from moto import settings
from moto.core.utils import amzn_request_id, str_to_rfc_1123_datetime
from urllib.parse import (
@ -261,7 +259,7 @@ class ResponseObject(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
# Depending on which calling format the client is using, we don't know
# if this is a bucket or key request so we have to check
if self.subdomain_based_buckets(request):
return self.key_or_control_response(request, full_url, headers)
return self.key_response(request, full_url, headers)
else:
# Using path-based buckets
return self.bucket_response(request, full_url, headers)
@ -1092,7 +1090,7 @@ class ResponseObject(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
return bytes(new_body)
@amzn_request_id
def key_or_control_response(self, request, full_url, headers):
def key_response(self, request, full_url, headers):
# Key and Control are lumped in because splitting out the regex is too much of a pain :/
self.method = request.method
self.path = self._get_path(request)
@ -1103,11 +1101,7 @@ class ResponseObject(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
response_headers = {}
try:
# Is this an S3 control response?
if isinstance(request, AWSPreparedRequest) and "s3-control" in request.url:
response = self._control_response(request, full_url, headers)
else:
response = self._key_response(request, full_url, self.headers)
response = self._key_response(request, full_url, self.headers)
except S3ClientError as s3error:
response = s3error.code, {}, s3error.description
@ -1130,94 +1124,6 @@ class ResponseObject(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
return s3error.code, {}, s3error.description
return status_code, response_headers, response_content
def _control_response(self, request, full_url, headers):
parsed_url = urlparse(full_url)
query = parse_qs(parsed_url.query, keep_blank_values=True)
method = request.method
if hasattr(request, "body"):
# Boto
body = request.body
if hasattr(body, "read"):
body = body.read()
else:
# Flask server
body = request.data
if body is None:
body = b""
if method == "GET":
return self._control_response_get(request, query, headers)
elif method == "PUT":
return self._control_response_put(request, body, query, headers)
elif method == "DELETE":
return self._control_response_delete(request, query, headers)
else:
raise NotImplementedError(
"Method {0} has not been implemented in the S3 backend yet".format(
method
)
)
def _control_response_get(self, request, query, headers):
action = self.path.split("?")[0].split("/")[
-1
] # Gets the action out of the URL sans query params.
self._set_action("CONTROL", "GET", action)
self._authenticate_and_authorize_s3_action()
response_headers = {}
if "publicAccessBlock" in action:
public_block_config = self.backend.get_account_public_access_block(
headers["x-amz-account-id"]
)
template = self.response_template(S3_PUBLIC_ACCESS_BLOCK_CONFIGURATION)
return (
200,
response_headers,
template.render(public_block_config=public_block_config),
)
raise NotImplementedError(
"Method {0} has not been implemented in the S3 backend yet".format(action)
)
def _control_response_put(self, request, body, query, headers):
action = self.path.split("?")[0].split("/")[
-1
] # Gets the action out of the URL sans query params.
self._set_action("CONTROL", "PUT", action)
self._authenticate_and_authorize_s3_action()
response_headers = {}
if "publicAccessBlock" in action:
pab_config = self._parse_pab_config(body)
self.backend.put_account_public_access_block(
headers["x-amz-account-id"],
pab_config["PublicAccessBlockConfiguration"],
)
return 200, response_headers, ""
raise NotImplementedError(
"Method {0} has not been implemented in the S3 backend yet".format(action)
)
def _control_response_delete(self, request, query, headers):
action = self.path.split("?")[0].split("/")[
-1
] # Gets the action out of the URL sans query params.
self._set_action("CONTROL", "DELETE", action)
self._authenticate_and_authorize_s3_action()
response_headers = {}
if "publicAccessBlock" in action:
self.backend.delete_account_public_access_block(headers["x-amz-account-id"])
return 200, response_headers, ""
raise NotImplementedError(
"Method {0} has not been implemented in the S3 backend yet".format(action)
)
def _key_response(self, request, full_url, headers):
parsed_url = urlparse(full_url)
query = parse_qs(parsed_url.query, keep_blank_values=True)

View File

@ -2,9 +2,10 @@ from moto import settings
from .responses import S3ResponseInstance
# Catch s3.amazonaws.com, but not s3-control.amazonaws.com
url_bases = [
r"https?://s3(.*)\.amazonaws.com",
r"https?://(?P<bucket_name>[a-zA-Z0-9\-_.]*)\.?s3(.*)\.amazonaws.com",
r"https?://s3(?!-control)(.*)\.amazonaws.com",
r"https?://(?P<bucket_name>[a-zA-Z0-9\-_.]*)\.?s3(?!-control)(.*)\.amazonaws.com",
]
url_bases.extend(settings.get_s3_custom_endpoints())
@ -15,7 +16,7 @@ url_paths = {
# subdomain key of path-based bucket
"{0}/(?P<key_or_bucket_name>[^/]+)/?$": S3ResponseInstance.ambiguous_response,
# path-based bucket + key
"{0}/(?P<bucket_name_path>[^/]+)/(?P<key_name>.+)": S3ResponseInstance.key_or_control_response,
"{0}/(?P<bucket_name_path>[^/]+)/(?P<key_name>.+)": S3ResponseInstance.key_response,
# subdomain bucket + key with empty first part of path
"{0}/(?P<key_name>/.*)$": S3ResponseInstance.key_or_control_response,
"{0}/(?P<key_name>/.*)$": S3ResponseInstance.key_response,
}

View File

@ -0,0 +1,6 @@
"""s3control module initialization; sets value for base decorator."""
from .models import s3control_backend
from ..core.models import base_decorator
s3control_backends = {"global": s3control_backend}
mock_s3control = base_decorator(s3control_backends)

155
moto/s3control/config.py Normal file
View File

@ -0,0 +1,155 @@
import datetime
import json
import time
from boto3 import Session
from moto.core.exceptions import InvalidNextTokenException
from moto.core.models import ConfigQueryModel
from moto.s3control import s3control_backends
from moto.s3.models import get_moto_s3_account_id
class S3AccountPublicAccessBlockConfigQuery(ConfigQueryModel):
def list_config_service_resources(
self,
resource_ids,
resource_name,
limit,
next_token,
backend_region=None,
resource_region=None,
aggregator=None,
):
# For the Account Public Access Block, they are the same for all regions. The resource ID is the AWS account ID
# There is no resource name -- it should be a blank string "" if provided.
# The resource name can only ever be None or an empty string:
if resource_name is not None and resource_name != "":
return [], None
pab = None
account_id = get_moto_s3_account_id()
regions = [region for region in Session().get_available_regions("config")]
# If a resource ID was passed in, then filter accordingly:
if resource_ids:
for resource_id in resource_ids:
if account_id == resource_id:
pab = self.backends["global"].public_access_block
break
# Otherwise, just grab the one from the backend:
if not resource_ids:
pab = self.backends["global"].public_access_block
# If it's not present, then return nothing
if not pab:
return [], None
# Filter on regions (and paginate on them as well):
if backend_region:
pab_list = [backend_region]
elif resource_region:
# Invalid region?
if resource_region not in regions:
return [], None
pab_list = [resource_region]
# Aggregated query where no regions were supplied so return them all:
else:
pab_list = regions
# Pagination logic:
sorted_regions = sorted(pab_list)
new_token = None
# Get the start:
if not next_token:
start = 0
else:
# Tokens for this moto feature is just the region-name:
# For OTHER non-global resource types, it's the region concatenated with the resource ID.
if next_token not in sorted_regions:
raise InvalidNextTokenException()
start = sorted_regions.index(next_token)
# Get the list of items to collect:
pab_list = sorted_regions[start : (start + limit)]
if len(sorted_regions) > (start + limit):
new_token = sorted_regions[start + limit]
return (
[
{
"type": "AWS::S3::AccountPublicAccessBlock",
"id": account_id,
"region": region,
}
for region in pab_list
],
new_token,
)
def get_config_resource(
self, resource_id, resource_name=None, backend_region=None, resource_region=None
):
# Do we even have this defined?
if not self.backends["global"].public_access_block:
return None
# Resource name can only ever be "" if it's supplied:
if resource_name is not None and resource_name != "":
return None
# Are we filtering based on region?
account_id = get_moto_s3_account_id()
regions = [region for region in Session().get_available_regions("config")]
# Is the resource ID correct?:
if account_id == resource_id:
if backend_region:
pab_region = backend_region
# Invalid region?
elif resource_region not in regions:
return None
else:
pab_region = resource_region
else:
return None
# Format the PAB to the AWS Config format:
creation_time = datetime.datetime.utcnow()
config_data = {
"version": "1.3",
"accountId": account_id,
"configurationItemCaptureTime": str(creation_time),
"configurationItemStatus": "OK",
"configurationStateId": str(
int(time.mktime(creation_time.timetuple()))
), # PY2 and 3 compatible
"resourceType": "AWS::S3::AccountPublicAccessBlock",
"resourceId": account_id,
"awsRegion": pab_region,
"availabilityZone": "Not Applicable",
"configuration": self.backends[
"global"
].public_access_block.to_config_dict(),
"supplementaryConfiguration": {},
}
# The 'configuration' field is also a JSON string:
config_data["configuration"] = json.dumps(config_data["configuration"])
return config_data
s3_account_public_access_block_query = S3AccountPublicAccessBlockConfigQuery(
s3control_backends
)

61
moto/s3control/models.py Normal file
View File

@ -0,0 +1,61 @@
from moto.core import ACCOUNT_ID, BaseBackend
from moto.s3.exceptions import (
WrongPublicAccessBlockAccountIdError,
NoSuchPublicAccessBlockConfiguration,
InvalidPublicAccessBlockConfiguration,
)
from moto.s3.models import PublicAccessBlock
class S3ControlBackend(BaseBackend):
"""
S3-Control cannot be accessed via the MotoServer without a modification of the hosts file on your system.
This is due to the fact that the URL to the host is in the form of:
ACCOUNT_ID.s3-control.amazonaws.com
That Account ID part is the problem. If you want to make use of the moto server, update your hosts file for `THE_ACCOUNT_ID_FOR_MOTO.localhost` and this will work fine.
"""
def __init__(self, region_name=None):
self.region_name = region_name
self.public_access_block = None
def reset(self):
region_name = self.region_name
self.__dict__ = {}
self.__init__(region_name)
def get_public_access_block(self, account_id):
# The account ID should equal the account id that is set for Moto:
if account_id != ACCOUNT_ID:
raise WrongPublicAccessBlockAccountIdError()
if not self.public_access_block:
raise NoSuchPublicAccessBlockConfiguration()
return self.public_access_block
def delete_public_access_block(self, account_id):
# The account ID should equal the account id that is set for Moto:
if account_id != ACCOUNT_ID:
raise WrongPublicAccessBlockAccountIdError()
self.public_access_block = None
def put_public_access_block(self, account_id, pub_block_config):
# The account ID should equal the account id that is set for Moto:
if account_id != ACCOUNT_ID:
raise WrongPublicAccessBlockAccountIdError()
if not pub_block_config:
raise InvalidPublicAccessBlockConfiguration()
self.public_access_block = PublicAccessBlock(
pub_block_config.get("BlockPublicAcls"),
pub_block_config.get("IgnorePublicAcls"),
pub_block_config.get("BlockPublicPolicy"),
pub_block_config.get("RestrictPublicBuckets"),
)
s3control_backend = S3ControlBackend()

View File

@ -0,0 +1,54 @@
import json
import xmltodict
from moto.core.responses import BaseResponse
from moto.core.utils import amzn_request_id
from moto.s3.exceptions import S3ClientError
from moto.s3.responses import S3_PUBLIC_ACCESS_BLOCK_CONFIGURATION
from .models import s3control_backend
class S3ControlResponse(BaseResponse):
@classmethod
def public_access_block(cls, request, full_url, headers):
response_instance = S3ControlResponse()
try:
return response_instance._public_access_block(request, headers)
except S3ClientError as err:
return err.code, {}, err.description
@amzn_request_id
def _public_access_block(self, request, headers):
if request.method == "GET":
return self.get_public_access_block(headers)
elif request.method == "PUT":
return self.put_public_access_block(request, headers)
elif request.method == "DELETE":
return self.delete_public_access_block(headers)
def get_public_access_block(self, headers):
account_id = headers["x-amz-account-id"]
public_block_config = s3control_backend.get_public_access_block(
account_id=account_id,
)
template = self.response_template(S3_PUBLIC_ACCESS_BLOCK_CONFIGURATION)
return 200, {}, template.render(public_block_config=public_block_config)
def put_public_access_block(self, request, headers):
account_id = headers["x-amz-account-id"]
pab_config = self._parse_pab_config(request.body)
s3control_backend.put_public_access_block(
account_id, pab_config["PublicAccessBlockConfiguration"]
)
return 201, {}, json.dumps({})
def delete_public_access_block(self, headers):
account_id = headers["x-amz-account-id"]
s3control_backend.delete_public_access_block(account_id=account_id,)
return 204, {}, json.dumps({})
def _parse_pab_config(self, body):
parsed_xml = xmltodict.parse(body)
parsed_xml["PublicAccessBlockConfiguration"].pop("@xmlns", None)
return parsed_xml

11
moto/s3control/urls.py Normal file
View File

@ -0,0 +1,11 @@
"""s3control base URL and path."""
from .responses import S3ControlResponse
url_bases = [
r"https?://([0-9]+)\.s3-control\.(.+)\.amazonaws\.com",
]
url_paths = {
"{0}/v20180820/configuration/publicAccessBlock$": S3ControlResponse.public_access_block,
}

View File

@ -45,4 +45,4 @@ def test_domain_dispatched_with_service():
dispatcher = DomainDispatcherApplication(create_backend_app, service="s3")
backend_app = dispatcher.get_application({"HTTP_HOST": "s3.us-east1.amazonaws.com"})
keys = set(backend_app.view_functions.keys())
keys.should.contain("ResponseObject.key_or_control_response")
keys.should.contain("ResponseObject.key_response")

View File

@ -1,6 +1,5 @@
import datetime
import os
from boto3 import Session
from urllib.parse import urlparse, parse_qs
from functools import wraps
from gzip import GzipFile
@ -1418,376 +1417,6 @@ if not settings.TEST_SERVER_MODE:
"The unspecified location constraint is incompatible for the region specific endpoint this request was sent to."
)
# All tests for s3-control cannot be run under the server without a modification of the
# hosts file on your system. This is due to the fact that the URL to the host is in the form of:
# ACCOUNT_ID.s3-control.amazonaws.com <-- That Account ID part is the problem. If you want to
# make use of the moto server, update your hosts file for `THE_ACCOUNT_ID_FOR_MOTO.localhost`
# and this will work fine.
@mock_s3
def test_get_public_access_block_for_account():
from moto.s3.models import ACCOUNT_ID
client = boto3.client("s3control", region_name="us-west-2")
# With an invalid account ID:
with pytest.raises(ClientError) as ce:
client.get_public_access_block(AccountId="111111111111")
assert ce.value.response["Error"]["Code"] == "AccessDenied"
# Without one defined:
with pytest.raises(ClientError) as ce:
client.get_public_access_block(AccountId=ACCOUNT_ID)
assert (
ce.value.response["Error"]["Code"] == "NoSuchPublicAccessBlockConfiguration"
)
# Put a with an invalid account ID:
with pytest.raises(ClientError) as ce:
client.put_public_access_block(
AccountId="111111111111",
PublicAccessBlockConfiguration={"BlockPublicAcls": True},
)
assert ce.value.response["Error"]["Code"] == "AccessDenied"
# Put with an invalid PAB:
with pytest.raises(ClientError) as ce:
client.put_public_access_block(
AccountId=ACCOUNT_ID, PublicAccessBlockConfiguration={}
)
assert ce.value.response["Error"]["Code"] == "InvalidRequest"
assert (
"Must specify at least one configuration."
in ce.value.response["Error"]["Message"]
)
# Correct PAB:
client.put_public_access_block(
AccountId=ACCOUNT_ID,
PublicAccessBlockConfiguration={
"BlockPublicAcls": True,
"IgnorePublicAcls": True,
"BlockPublicPolicy": True,
"RestrictPublicBuckets": True,
},
)
# Get the correct PAB (for all regions):
for region in Session().get_available_regions("s3control"):
region_client = boto3.client("s3control", region_name=region)
assert region_client.get_public_access_block(AccountId=ACCOUNT_ID)[
"PublicAccessBlockConfiguration"
] == {
"BlockPublicAcls": True,
"IgnorePublicAcls": True,
"BlockPublicPolicy": True,
"RestrictPublicBuckets": True,
}
# Delete with an invalid account ID:
with pytest.raises(ClientError) as ce:
client.delete_public_access_block(AccountId="111111111111")
assert ce.value.response["Error"]["Code"] == "AccessDenied"
# Delete successfully:
client.delete_public_access_block(AccountId=ACCOUNT_ID)
# Confirm that it's deleted:
with pytest.raises(ClientError) as ce:
client.get_public_access_block(AccountId=ACCOUNT_ID)
assert (
ce.value.response["Error"]["Code"] == "NoSuchPublicAccessBlockConfiguration"
)
@mock_s3
@mock_config
def test_config_list_account_pab():
from moto.s3.models import ACCOUNT_ID
client = boto3.client("s3control", region_name="us-west-2")
config_client = boto3.client("config", region_name="us-west-2")
# Create the aggregator:
account_aggregation_source = {
"AccountIds": [ACCOUNT_ID],
"AllAwsRegions": True,
}
config_client.put_configuration_aggregator(
ConfigurationAggregatorName="testing",
AccountAggregationSources=[account_aggregation_source],
)
# Without a PAB in place:
result = config_client.list_discovered_resources(
resourceType="AWS::S3::AccountPublicAccessBlock"
)
assert not result["resourceIdentifiers"]
result = config_client.list_aggregate_discovered_resources(
ResourceType="AWS::S3::AccountPublicAccessBlock",
ConfigurationAggregatorName="testing",
)
assert not result["ResourceIdentifiers"]
# Create a PAB:
client.put_public_access_block(
AccountId=ACCOUNT_ID,
PublicAccessBlockConfiguration={
"BlockPublicAcls": True,
"IgnorePublicAcls": True,
"BlockPublicPolicy": True,
"RestrictPublicBuckets": True,
},
)
# Test that successful queries work (non-aggregated):
result = config_client.list_discovered_resources(
resourceType="AWS::S3::AccountPublicAccessBlock"
)
assert result["resourceIdentifiers"] == [
{
"resourceType": "AWS::S3::AccountPublicAccessBlock",
"resourceId": ACCOUNT_ID,
}
]
result = config_client.list_discovered_resources(
resourceType="AWS::S3::AccountPublicAccessBlock",
resourceIds=[ACCOUNT_ID, "nope"],
)
assert result["resourceIdentifiers"] == [
{
"resourceType": "AWS::S3::AccountPublicAccessBlock",
"resourceId": ACCOUNT_ID,
}
]
result = config_client.list_discovered_resources(
resourceType="AWS::S3::AccountPublicAccessBlock", resourceName=""
)
assert result["resourceIdentifiers"] == [
{
"resourceType": "AWS::S3::AccountPublicAccessBlock",
"resourceId": ACCOUNT_ID,
}
]
# Test that successful queries work (aggregated):
result = config_client.list_aggregate_discovered_resources(
ResourceType="AWS::S3::AccountPublicAccessBlock",
ConfigurationAggregatorName="testing",
)
regions = {region for region in Session().get_available_regions("config")}
for r in result["ResourceIdentifiers"]:
regions.remove(r.pop("SourceRegion"))
assert r == {
"ResourceType": "AWS::S3::AccountPublicAccessBlock",
"SourceAccountId": ACCOUNT_ID,
"ResourceId": ACCOUNT_ID,
}
# Just check that the len is the same -- this should be reasonable
regions = {region for region in Session().get_available_regions("config")}
result = config_client.list_aggregate_discovered_resources(
ResourceType="AWS::S3::AccountPublicAccessBlock",
ConfigurationAggregatorName="testing",
Filters={"ResourceName": ""},
)
assert len(regions) == len(result["ResourceIdentifiers"])
result = config_client.list_aggregate_discovered_resources(
ResourceType="AWS::S3::AccountPublicAccessBlock",
ConfigurationAggregatorName="testing",
Filters={"ResourceName": "", "ResourceId": ACCOUNT_ID},
)
assert len(regions) == len(result["ResourceIdentifiers"])
result = config_client.list_aggregate_discovered_resources(
ResourceType="AWS::S3::AccountPublicAccessBlock",
ConfigurationAggregatorName="testing",
Filters={
"ResourceName": "",
"ResourceId": ACCOUNT_ID,
"Region": "us-west-2",
},
)
assert (
result["ResourceIdentifiers"][0]["SourceRegion"] == "us-west-2"
and len(result["ResourceIdentifiers"]) == 1
)
# Test aggregator pagination:
result = config_client.list_aggregate_discovered_resources(
ResourceType="AWS::S3::AccountPublicAccessBlock",
ConfigurationAggregatorName="testing",
Limit=1,
)
regions = sorted(
[region for region in Session().get_available_regions("config")]
)
assert result["ResourceIdentifiers"][0] == {
"ResourceType": "AWS::S3::AccountPublicAccessBlock",
"SourceAccountId": ACCOUNT_ID,
"ResourceId": ACCOUNT_ID,
"SourceRegion": regions[0],
}
assert result["NextToken"] == regions[1]
# Get the next region:
result = config_client.list_aggregate_discovered_resources(
ResourceType="AWS::S3::AccountPublicAccessBlock",
ConfigurationAggregatorName="testing",
Limit=1,
NextToken=regions[1],
)
assert result["ResourceIdentifiers"][0] == {
"ResourceType": "AWS::S3::AccountPublicAccessBlock",
"SourceAccountId": ACCOUNT_ID,
"ResourceId": ACCOUNT_ID,
"SourceRegion": regions[1],
}
# Non-aggregated with incorrect info:
result = config_client.list_discovered_resources(
resourceType="AWS::S3::AccountPublicAccessBlock", resourceName="nope"
)
assert not result["resourceIdentifiers"]
result = config_client.list_discovered_resources(
resourceType="AWS::S3::AccountPublicAccessBlock", resourceIds=["nope"]
)
assert not result["resourceIdentifiers"]
# Aggregated with incorrect info:
result = config_client.list_aggregate_discovered_resources(
ResourceType="AWS::S3::AccountPublicAccessBlock",
ConfigurationAggregatorName="testing",
Filters={"ResourceName": "nope"},
)
assert not result["ResourceIdentifiers"]
result = config_client.list_aggregate_discovered_resources(
ResourceType="AWS::S3::AccountPublicAccessBlock",
ConfigurationAggregatorName="testing",
Filters={"ResourceId": "nope"},
)
assert not result["ResourceIdentifiers"]
result = config_client.list_aggregate_discovered_resources(
ResourceType="AWS::S3::AccountPublicAccessBlock",
ConfigurationAggregatorName="testing",
Filters={"Region": "Nope"},
)
assert not result["ResourceIdentifiers"]
@mock_s3
@mock_config
def test_config_get_account_pab():
from moto.s3.models import ACCOUNT_ID
client = boto3.client("s3control", region_name="us-west-2")
config_client = boto3.client("config", region_name="us-west-2")
# Create the aggregator:
account_aggregation_source = {
"AccountIds": [ACCOUNT_ID],
"AllAwsRegions": True,
}
config_client.put_configuration_aggregator(
ConfigurationAggregatorName="testing",
AccountAggregationSources=[account_aggregation_source],
)
# Without a PAB in place:
with pytest.raises(ClientError) as ce:
config_client.get_resource_config_history(
resourceType="AWS::S3::AccountPublicAccessBlock", resourceId=ACCOUNT_ID
)
assert ce.value.response["Error"]["Code"] == "ResourceNotDiscoveredException"
# aggregate
result = config_client.batch_get_resource_config(
resourceKeys=[
{
"resourceType": "AWS::S3::AccountPublicAccessBlock",
"resourceId": "ACCOUNT_ID",
}
]
)
assert not result["baseConfigurationItems"]
result = config_client.batch_get_aggregate_resource_config(
ConfigurationAggregatorName="testing",
ResourceIdentifiers=[
{
"SourceAccountId": ACCOUNT_ID,
"SourceRegion": "us-west-2",
"ResourceId": ACCOUNT_ID,
"ResourceType": "AWS::S3::AccountPublicAccessBlock",
"ResourceName": "",
}
],
)
assert not result["BaseConfigurationItems"]
# Create a PAB:
client.put_public_access_block(
AccountId=ACCOUNT_ID,
PublicAccessBlockConfiguration={
"BlockPublicAcls": True,
"IgnorePublicAcls": True,
"BlockPublicPolicy": True,
"RestrictPublicBuckets": True,
},
)
# Get the proper config:
proper_config = {
"blockPublicAcls": True,
"ignorePublicAcls": True,
"blockPublicPolicy": True,
"restrictPublicBuckets": True,
}
result = config_client.get_resource_config_history(
resourceType="AWS::S3::AccountPublicAccessBlock", resourceId=ACCOUNT_ID
)
assert (
json.loads(result["configurationItems"][0]["configuration"])
== proper_config
)
assert (
result["configurationItems"][0]["accountId"]
== result["configurationItems"][0]["resourceId"]
== ACCOUNT_ID
)
result = config_client.batch_get_resource_config(
resourceKeys=[
{
"resourceType": "AWS::S3::AccountPublicAccessBlock",
"resourceId": ACCOUNT_ID,
}
]
)
assert len(result["baseConfigurationItems"]) == 1
assert (
json.loads(result["baseConfigurationItems"][0]["configuration"])
== proper_config
)
assert (
result["baseConfigurationItems"][0]["accountId"]
== result["baseConfigurationItems"][0]["resourceId"]
== ACCOUNT_ID
)
for region in Session().get_available_regions("s3control"):
result = config_client.batch_get_aggregate_resource_config(
ConfigurationAggregatorName="testing",
ResourceIdentifiers=[
{
"SourceAccountId": ACCOUNT_ID,
"SourceRegion": region,
"ResourceId": ACCOUNT_ID,
"ResourceType": "AWS::S3::AccountPublicAccessBlock",
"ResourceName": "",
}
],
)
assert len(result["BaseConfigurationItems"]) == 1
assert (
json.loads(result["BaseConfigurationItems"][0]["configuration"])
== proper_config
)
@mock_s3
def test_ranged_get_boto3():

View File

View File

@ -0,0 +1,90 @@
import boto3
import pytest
import sure # noqa # pylint: disable=unused-import
from boto3 import Session
from botocore.client import ClientError
from moto import settings, mock_s3control
# All tests for s3-control cannot be run under the server without a modification of the
# hosts file on your system. This is due to the fact that the URL to the host is in the form of:
# ACCOUNT_ID.s3-control.amazonaws.com <-- That Account ID part is the problem. If you want to
# make use of the moto server, update your hosts file for `THE_ACCOUNT_ID_FOR_MOTO.localhost`
# and this will work fine.
if not settings.TEST_SERVER_MODE:
@mock_s3control
def test_get_public_access_block_for_account():
from moto.s3.models import ACCOUNT_ID
client = boto3.client("s3control", region_name="us-west-2")
# With an invalid account ID:
with pytest.raises(ClientError) as ce:
client.get_public_access_block(AccountId="111111111111")
assert ce.value.response["Error"]["Code"] == "AccessDenied"
# Without one defined:
with pytest.raises(ClientError) as ce:
client.get_public_access_block(AccountId=ACCOUNT_ID)
assert (
ce.value.response["Error"]["Code"] == "NoSuchPublicAccessBlockConfiguration"
)
# Put a with an invalid account ID:
with pytest.raises(ClientError) as ce:
client.put_public_access_block(
AccountId="111111111111",
PublicAccessBlockConfiguration={"BlockPublicAcls": True},
)
assert ce.value.response["Error"]["Code"] == "AccessDenied"
# Put with an invalid PAB:
with pytest.raises(ClientError) as ce:
client.put_public_access_block(
AccountId=ACCOUNT_ID, PublicAccessBlockConfiguration={}
)
assert ce.value.response["Error"]["Code"] == "InvalidRequest"
assert (
"Must specify at least one configuration."
in ce.value.response["Error"]["Message"]
)
# Correct PAB:
client.put_public_access_block(
AccountId=ACCOUNT_ID,
PublicAccessBlockConfiguration={
"BlockPublicAcls": True,
"IgnorePublicAcls": True,
"BlockPublicPolicy": True,
"RestrictPublicBuckets": True,
},
)
# Get the correct PAB (for all regions):
for region in Session().get_available_regions("s3control"):
region_client = boto3.client("s3control", region_name=region)
assert region_client.get_public_access_block(AccountId=ACCOUNT_ID)[
"PublicAccessBlockConfiguration"
] == {
"BlockPublicAcls": True,
"IgnorePublicAcls": True,
"BlockPublicPolicy": True,
"RestrictPublicBuckets": True,
}
# Delete with an invalid account ID:
with pytest.raises(ClientError) as ce:
client.delete_public_access_block(AccountId="111111111111")
assert ce.value.response["Error"]["Code"] == "AccessDenied"
# Delete successfully:
client.delete_public_access_block(AccountId=ACCOUNT_ID)
# Confirm that it's deleted:
with pytest.raises(ClientError) as ce:
client.get_public_access_block(AccountId=ACCOUNT_ID)
assert (
ce.value.response["Error"]["Code"] == "NoSuchPublicAccessBlockConfiguration"
)

View File

@ -0,0 +1,305 @@
import boto3
import json
import pytest
import sure # noqa # pylint: disable=unused-import
from boto3 import Session
from botocore.client import ClientError
from moto import settings, mock_s3control, mock_config
# All tests for s3-control cannot be run under the server without a modification of the
# hosts file on your system. This is due to the fact that the URL to the host is in the form of:
# ACCOUNT_ID.s3-control.amazonaws.com <-- That Account ID part is the problem. If you want to
# make use of the moto server, update your hosts file for `THE_ACCOUNT_ID_FOR_MOTO.localhost`
# and this will work fine.
if not settings.TEST_SERVER_MODE:
@mock_s3control
@mock_config
def test_config_list_account_pab():
from moto.s3.models import ACCOUNT_ID
client = boto3.client("s3control", region_name="us-west-2")
config_client = boto3.client("config", region_name="us-west-2")
# Create the aggregator:
account_aggregation_source = {
"AccountIds": [ACCOUNT_ID],
"AllAwsRegions": True,
}
config_client.put_configuration_aggregator(
ConfigurationAggregatorName="testing",
AccountAggregationSources=[account_aggregation_source],
)
# Without a PAB in place:
result = config_client.list_discovered_resources(
resourceType="AWS::S3::AccountPublicAccessBlock"
)
assert not result["resourceIdentifiers"]
result = config_client.list_aggregate_discovered_resources(
ResourceType="AWS::S3::AccountPublicAccessBlock",
ConfigurationAggregatorName="testing",
)
assert not result["ResourceIdentifiers"]
# Create a PAB:
client.put_public_access_block(
AccountId=ACCOUNT_ID,
PublicAccessBlockConfiguration={
"BlockPublicAcls": True,
"IgnorePublicAcls": True,
"BlockPublicPolicy": True,
"RestrictPublicBuckets": True,
},
)
# Test that successful queries work (non-aggregated):
result = config_client.list_discovered_resources(
resourceType="AWS::S3::AccountPublicAccessBlock"
)
assert result["resourceIdentifiers"] == [
{
"resourceType": "AWS::S3::AccountPublicAccessBlock",
"resourceId": ACCOUNT_ID,
}
]
result = config_client.list_discovered_resources(
resourceType="AWS::S3::AccountPublicAccessBlock",
resourceIds=[ACCOUNT_ID, "nope"],
)
assert result["resourceIdentifiers"] == [
{
"resourceType": "AWS::S3::AccountPublicAccessBlock",
"resourceId": ACCOUNT_ID,
}
]
result = config_client.list_discovered_resources(
resourceType="AWS::S3::AccountPublicAccessBlock", resourceName=""
)
assert result["resourceIdentifiers"] == [
{
"resourceType": "AWS::S3::AccountPublicAccessBlock",
"resourceId": ACCOUNT_ID,
}
]
# Test that successful queries work (aggregated):
result = config_client.list_aggregate_discovered_resources(
ResourceType="AWS::S3::AccountPublicAccessBlock",
ConfigurationAggregatorName="testing",
)
regions = {region for region in Session().get_available_regions("config")}
for r in result["ResourceIdentifiers"]:
regions.remove(r.pop("SourceRegion"))
assert r == {
"ResourceType": "AWS::S3::AccountPublicAccessBlock",
"SourceAccountId": ACCOUNT_ID,
"ResourceId": ACCOUNT_ID,
}
# Just check that the len is the same -- this should be reasonable
regions = {region for region in Session().get_available_regions("config")}
result = config_client.list_aggregate_discovered_resources(
ResourceType="AWS::S3::AccountPublicAccessBlock",
ConfigurationAggregatorName="testing",
Filters={"ResourceName": ""},
)
assert len(regions) == len(result["ResourceIdentifiers"])
result = config_client.list_aggregate_discovered_resources(
ResourceType="AWS::S3::AccountPublicAccessBlock",
ConfigurationAggregatorName="testing",
Filters={"ResourceName": "", "ResourceId": ACCOUNT_ID},
)
assert len(regions) == len(result["ResourceIdentifiers"])
result = config_client.list_aggregate_discovered_resources(
ResourceType="AWS::S3::AccountPublicAccessBlock",
ConfigurationAggregatorName="testing",
Filters={
"ResourceName": "",
"ResourceId": ACCOUNT_ID,
"Region": "us-west-2",
},
)
assert (
result["ResourceIdentifiers"][0]["SourceRegion"] == "us-west-2"
and len(result["ResourceIdentifiers"]) == 1
)
# Test aggregator pagination:
result = config_client.list_aggregate_discovered_resources(
ResourceType="AWS::S3::AccountPublicAccessBlock",
ConfigurationAggregatorName="testing",
Limit=1,
)
regions = sorted(
[region for region in Session().get_available_regions("config")]
)
assert result["ResourceIdentifiers"][0] == {
"ResourceType": "AWS::S3::AccountPublicAccessBlock",
"SourceAccountId": ACCOUNT_ID,
"ResourceId": ACCOUNT_ID,
"SourceRegion": regions[0],
}
assert result["NextToken"] == regions[1]
# Get the next region:
result = config_client.list_aggregate_discovered_resources(
ResourceType="AWS::S3::AccountPublicAccessBlock",
ConfigurationAggregatorName="testing",
Limit=1,
NextToken=regions[1],
)
assert result["ResourceIdentifiers"][0] == {
"ResourceType": "AWS::S3::AccountPublicAccessBlock",
"SourceAccountId": ACCOUNT_ID,
"ResourceId": ACCOUNT_ID,
"SourceRegion": regions[1],
}
# Non-aggregated with incorrect info:
result = config_client.list_discovered_resources(
resourceType="AWS::S3::AccountPublicAccessBlock", resourceName="nope"
)
assert not result["resourceIdentifiers"]
result = config_client.list_discovered_resources(
resourceType="AWS::S3::AccountPublicAccessBlock", resourceIds=["nope"]
)
assert not result["resourceIdentifiers"]
# Aggregated with incorrect info:
result = config_client.list_aggregate_discovered_resources(
ResourceType="AWS::S3::AccountPublicAccessBlock",
ConfigurationAggregatorName="testing",
Filters={"ResourceName": "nope"},
)
assert not result["ResourceIdentifiers"]
result = config_client.list_aggregate_discovered_resources(
ResourceType="AWS::S3::AccountPublicAccessBlock",
ConfigurationAggregatorName="testing",
Filters={"ResourceId": "nope"},
)
assert not result["ResourceIdentifiers"]
result = config_client.list_aggregate_discovered_resources(
ResourceType="AWS::S3::AccountPublicAccessBlock",
ConfigurationAggregatorName="testing",
Filters={"Region": "Nope"},
)
assert not result["ResourceIdentifiers"]
@mock_s3control
@mock_config
def test_config_get_account_pab():
from moto.s3.models import ACCOUNT_ID
client = boto3.client("s3control", region_name="us-west-2")
config_client = boto3.client("config", region_name="us-west-2")
# Create the aggregator:
account_aggregation_source = {
"AccountIds": [ACCOUNT_ID],
"AllAwsRegions": True,
}
config_client.put_configuration_aggregator(
ConfigurationAggregatorName="testing",
AccountAggregationSources=[account_aggregation_source],
)
# Without a PAB in place:
with pytest.raises(ClientError) as ce:
config_client.get_resource_config_history(
resourceType="AWS::S3::AccountPublicAccessBlock", resourceId=ACCOUNT_ID
)
assert ce.value.response["Error"]["Code"] == "ResourceNotDiscoveredException"
# aggregate
result = config_client.batch_get_resource_config(
resourceKeys=[
{
"resourceType": "AWS::S3::AccountPublicAccessBlock",
"resourceId": "ACCOUNT_ID",
}
]
)
assert not result["baseConfigurationItems"]
result = config_client.batch_get_aggregate_resource_config(
ConfigurationAggregatorName="testing",
ResourceIdentifiers=[
{
"SourceAccountId": ACCOUNT_ID,
"SourceRegion": "us-west-2",
"ResourceId": ACCOUNT_ID,
"ResourceType": "AWS::S3::AccountPublicAccessBlock",
"ResourceName": "",
}
],
)
assert not result["BaseConfigurationItems"]
# Create a PAB:
client.put_public_access_block(
AccountId=ACCOUNT_ID,
PublicAccessBlockConfiguration={
"BlockPublicAcls": True,
"IgnorePublicAcls": True,
"BlockPublicPolicy": True,
"RestrictPublicBuckets": True,
},
)
# Get the proper config:
proper_config = {
"blockPublicAcls": True,
"ignorePublicAcls": True,
"blockPublicPolicy": True,
"restrictPublicBuckets": True,
}
result = config_client.get_resource_config_history(
resourceType="AWS::S3::AccountPublicAccessBlock", resourceId=ACCOUNT_ID
)
assert (
json.loads(result["configurationItems"][0]["configuration"])
== proper_config
)
assert (
result["configurationItems"][0]["accountId"]
== result["configurationItems"][0]["resourceId"]
== ACCOUNT_ID
)
result = config_client.batch_get_resource_config(
resourceKeys=[
{
"resourceType": "AWS::S3::AccountPublicAccessBlock",
"resourceId": ACCOUNT_ID,
}
]
)
assert len(result["baseConfigurationItems"]) == 1
assert (
json.loads(result["baseConfigurationItems"][0]["configuration"])
== proper_config
)
assert (
result["baseConfigurationItems"][0]["accountId"]
== result["baseConfigurationItems"][0]["resourceId"]
== ACCOUNT_ID
)
for region in Session().get_available_regions("s3control"):
result = config_client.batch_get_aggregate_resource_config(
ConfigurationAggregatorName="testing",
ResourceIdentifiers=[
{
"SourceAccountId": ACCOUNT_ID,
"SourceRegion": region,
"ResourceId": ACCOUNT_ID,
"ResourceType": "AWS::S3::AccountPublicAccessBlock",
"ResourceName": "",
}
],
)
assert len(result["BaseConfigurationItems"]) == 1
assert (
json.loads(result["BaseConfigurationItems"][0]["configuration"])
== proper_config
)

View File

@ -0,0 +1,103 @@
"""Test that using both s3 and s3control do not interfere"""
import boto3
import sure # noqa # pylint: disable=unused-import
from moto import mock_s3, mock_s3control, settings
from moto.core import ACCOUNT_ID
if not settings.TEST_SERVER_MODE:
@mock_s3
@mock_s3control
def test_pab_are_kept_separate():
client = boto3.client("s3control", region_name="us-east-1")
s3 = boto3.client("s3", region_name="us-east-1")
s3.create_bucket(Bucket="bucket")
client.put_public_access_block(
AccountId=ACCOUNT_ID,
PublicAccessBlockConfiguration={
"BlockPublicAcls": True,
"IgnorePublicAcls": True,
"BlockPublicPolicy": True,
"RestrictPublicBuckets": True,
},
)
s3.put_public_access_block(
Bucket="bucket",
PublicAccessBlockConfiguration={
"BlockPublicAcls": True,
"IgnorePublicAcls": False,
"BlockPublicPolicy": True,
"RestrictPublicBuckets": False,
},
)
pab_from_control = client.get_public_access_block(AccountId=ACCOUNT_ID)
pab_from_control.should.have.key("PublicAccessBlockConfiguration").equals(
{
"BlockPublicAcls": True,
"IgnorePublicAcls": True,
"BlockPublicPolicy": True,
"RestrictPublicBuckets": True,
}
)
pab_from_s3 = s3.get_public_access_block(Bucket="bucket")
pab_from_s3.should.have.key("PublicAccessBlockConfiguration").equals(
{
"BlockPublicAcls": True,
"IgnorePublicAcls": False,
"BlockPublicPolicy": True,
"RestrictPublicBuckets": False,
}
)
@mock_s3control
@mock_s3
def test_pab_are_kept_separate_with_inverse_mocks():
client = boto3.client("s3control", region_name="us-east-1")
s3 = boto3.client("s3", region_name="us-east-1")
s3.create_bucket(Bucket="bucket")
client.put_public_access_block(
AccountId=ACCOUNT_ID,
PublicAccessBlockConfiguration={
"BlockPublicAcls": True,
"IgnorePublicAcls": True,
"BlockPublicPolicy": True,
"RestrictPublicBuckets": True,
},
)
s3.put_public_access_block(
Bucket="bucket",
PublicAccessBlockConfiguration={
"BlockPublicAcls": True,
"IgnorePublicAcls": False,
"BlockPublicPolicy": True,
"RestrictPublicBuckets": False,
},
)
pab_from_control = client.get_public_access_block(AccountId=ACCOUNT_ID)
pab_from_control.should.have.key("PublicAccessBlockConfiguration").equals(
{
"BlockPublicAcls": True,
"IgnorePublicAcls": True,
"BlockPublicPolicy": True,
"RestrictPublicBuckets": True,
}
)
pab_from_s3 = s3.get_public_access_block(Bucket="bucket")
pab_from_s3.should.have.key("PublicAccessBlockConfiguration").equals(
{
"BlockPublicAcls": True,
"IgnorePublicAcls": False,
"BlockPublicPolicy": True,
"RestrictPublicBuckets": False,
}
)