Implemented S3 Account-level public access block.

- Also added AWS Config listing and fetching support
- Also fixed Lambda test breakage
This commit is contained in:
Mike Grima 2020-02-13 18:01:44 -08:00
parent d463c11793
commit 11b7be0e85
12 changed files with 746 additions and 51 deletions

View File

@ -26,11 +26,12 @@ install:
fi fi
docker run --rm -t --name motoserver -e TEST_SERVER_MODE=true -e AWS_SECRET_ACCESS_KEY=server_secret -e AWS_ACCESS_KEY_ID=server_key -v `pwd`:/moto -p 5000:5000 -v /var/run/docker.sock:/var/run/docker.sock python:${PYTHON_DOCKER_TAG} /moto/travis_moto_server.sh & docker run --rm -t --name motoserver -e TEST_SERVER_MODE=true -e AWS_SECRET_ACCESS_KEY=server_secret -e AWS_ACCESS_KEY_ID=server_key -v `pwd`:/moto -p 5000:5000 -v /var/run/docker.sock:/var/run/docker.sock python:${PYTHON_DOCKER_TAG} /moto/travis_moto_server.sh &
fi fi
travis_retry pip install -r requirements-dev.txt
travis_retry pip install boto==2.45.0 travis_retry pip install boto==2.45.0
travis_retry pip install boto3 travis_retry pip install boto3
travis_retry pip install dist/moto*.gz travis_retry pip install dist/moto*.gz
travis_retry pip install coveralls==1.1 travis_retry pip install coveralls==1.1
travis_retry pip install -r requirements-dev.txt travis_retry pip install coverage==4.5.4
if [ "$TEST_SERVER_MODE" = "true" ]; then if [ "$TEST_SERVER_MODE" = "true" ]; then
python wait_for.py python wait_for.py

View File

@ -450,6 +450,16 @@ boto3.resource(
) )
``` ```
### Caveats
The standalone server has some caveats with some services. The following services
require that you update your hosts file for your code to work properly:
1. `s3-control`
For the above services, this is required because the hostname is in the form of `AWS_ACCOUNT_ID.localhost`.
As a result, you need to add that entry to your host file for your tests to function properly.
## Install ## Install

View File

@ -43,7 +43,7 @@ from moto.config.exceptions import (
) )
from moto.core import BaseBackend, BaseModel from moto.core import BaseBackend, BaseModel
from moto.s3.config import s3_config_query from moto.s3.config import s3_account_public_access_block_query, s3_config_query
from moto.core import ACCOUNT_ID as DEFAULT_ACCOUNT_ID from moto.core import ACCOUNT_ID as DEFAULT_ACCOUNT_ID
@ -58,7 +58,10 @@ POP_STRINGS = [
DEFAULT_PAGE_SIZE = 100 DEFAULT_PAGE_SIZE = 100
# Map the Config resource type to a backend: # Map the Config resource type to a backend:
RESOURCE_MAP = {"AWS::S3::Bucket": s3_config_query} RESOURCE_MAP = {
"AWS::S3::Bucket": s3_config_query,
"AWS::S3::AccountPublicAccessBlock": s3_account_public_access_block_query,
}
def datetime2int(date): def datetime2int(date):
@ -867,16 +870,17 @@ class ConfigBackend(BaseBackend):
backend_region=backend_query_region, backend_region=backend_query_region,
) )
result = { resource_identifiers = []
"resourceIdentifiers": [ for identifier in identifiers:
{ item = {"resourceType": identifier["type"], "resourceId": identifier["id"]}
"resourceType": identifier["type"],
"resourceId": identifier["id"], # Some resource types lack names:
"resourceName": identifier["name"], if identifier.get("name"):
} item["resourceName"] = identifier["name"]
for identifier in identifiers
] resource_identifiers.append(item)
}
result = {"resourceIdentifiers": resource_identifiers}
if new_token: if new_token:
result["nextToken"] = new_token result["nextToken"] = new_token
@ -927,19 +931,22 @@ class ConfigBackend(BaseBackend):
resource_region=resource_region, resource_region=resource_region,
) )
result = { resource_identifiers = []
"ResourceIdentifiers": [ for identifier in identifiers:
{ item = {
"SourceAccountId": DEFAULT_ACCOUNT_ID, "SourceAccountId": DEFAULT_ACCOUNT_ID,
"SourceRegion": identifier["region"], "SourceRegion": identifier["region"],
"ResourceType": identifier["type"], "ResourceType": identifier["type"],
"ResourceId": identifier["id"], "ResourceId": identifier["id"],
"ResourceName": identifier["name"],
}
for identifier in identifiers
]
} }
if identifier.get("name"):
item["ResourceName"] = identifier["name"]
resource_identifiers.append(item)
result = {"ResourceIdentifiers": resource_identifiers}
if new_token: if new_token:
result["NextToken"] = new_token result["NextToken"] = new_token

View File

@ -606,12 +606,13 @@ class ConfigQueryModel(object):
As such, the proper way to implement is to first obtain a full list of results from all the region backends, and then filter As such, the proper way to implement is to first obtain a full list of results from all the region backends, and then filter
from there. It may be valuable to make this a concatenation of the region and resource name. from there. It may be valuable to make this a concatenation of the region and resource name.
:param resource_region: :param resource_ids: A list of resource IDs
:param resource_ids: :param resource_name: The individual name of a resource
:param resource_name: :param limit: How many per page
:param limit: :param next_token: The item that will page on
:param next_token:
:param backend_region: The region for the backend to pull results from. Set to `None` if this is an aggregated query. :param backend_region: The region for the backend to pull results from. Set to `None` if this is an aggregated query.
:param resource_region: The region for where the resources reside to pull results from. Set to `None` if this is a
non-aggregated query.
:return: This should return a list of Dicts that have the following fields: :return: This should return a list of Dicts that have the following fields:
[ [
{ {

View File

@ -1,8 +1,13 @@
import datetime
import json import json
import time
from boto3 import Session
from moto.core.exceptions import InvalidNextTokenException from moto.core.exceptions import InvalidNextTokenException
from moto.core.models import ConfigQueryModel from moto.core.models import ConfigQueryModel
from moto.s3 import s3_backends from moto.s3 import s3_backends
from moto.s3.models import get_moto_s3_account_id
class S3ConfigQuery(ConfigQueryModel): class S3ConfigQuery(ConfigQueryModel):
@ -118,4 +123,146 @@ class S3ConfigQuery(ConfigQueryModel):
return config_data return config_data
class S3AccountPublicAccessBlockConfigQuery(ConfigQueryModel):
def list_config_service_resources(
self,
resource_ids,
resource_name,
limit,
next_token,
backend_region=None,
resource_region=None,
):
# For the Account Public Access Block, they are the same for all regions. The resource ID is the AWS account ID
# There is no resource name -- it should be a blank string "" if provided.
# The resource name can only ever be None or an empty string:
if resource_name is not None and resource_name != "":
return [], None
pab = None
account_id = get_moto_s3_account_id()
regions = [region for region in Session().get_available_regions("config")]
# If a resource ID was passed in, then filter accordingly:
if resource_ids:
for id in resource_ids:
if account_id == id:
pab = self.backends["global"].account_public_access_block
break
# Otherwise, just grab the one from the backend:
if not resource_ids:
pab = self.backends["global"].account_public_access_block
# If it's not present, then return nothing
if not pab:
return [], None
# Filter on regions (and paginate on them as well):
if backend_region:
pab_list = [backend_region]
elif resource_region:
# Invalid region?
if resource_region not in regions:
return [], None
pab_list = [resource_region]
# Aggregated query where no regions were supplied so return them all:
else:
pab_list = regions
# Pagination logic:
sorted_regions = sorted(pab_list)
new_token = None
# Get the start:
if not next_token:
start = 0
else:
# Tokens for this moto feature is just the region-name:
# For OTHER non-global resource types, it's the region concatenated with the resource ID.
if next_token not in sorted_regions:
raise InvalidNextTokenException()
start = sorted_regions.index(next_token)
# Get the list of items to collect:
pab_list = sorted_regions[start : (start + limit)]
if len(sorted_regions) > (start + limit):
new_token = sorted_regions[start + limit]
return (
[
{
"type": "AWS::S3::AccountPublicAccessBlock",
"id": account_id,
"region": region,
}
for region in pab_list
],
new_token,
)
def get_config_resource(
self, resource_id, resource_name=None, backend_region=None, resource_region=None
):
# Do we even have this defined?
if not self.backends["global"].account_public_access_block:
return None
# Resource name can only ever be "" if it's supplied:
if resource_name is not None and resource_name != "":
return None
# Are we filtering based on region?
account_id = get_moto_s3_account_id()
regions = [region for region in Session().get_available_regions("config")]
# Is the resource ID correct?:
if account_id == resource_id:
if backend_region:
pab_region = backend_region
# Invalid region?
elif resource_region not in regions:
return None
else:
pab_region = resource_region
else:
return None
# Format the PAB to the AWS Config format:
creation_time = datetime.datetime.utcnow()
config_data = {
"version": "1.3",
"accountId": account_id,
"configurationItemCaptureTime": str(creation_time),
"configurationItemStatus": "OK",
"configurationStateId": str(
int(time.mktime(creation_time.timetuple()))
), # PY2 and 3 compatible
"resourceType": "AWS::S3::AccountPublicAccessBlock",
"resourceId": account_id,
"awsRegion": pab_region,
"availabilityZone": "Not Applicable",
"configuration": self.backends[
"global"
].account_public_access_block.to_config_dict(),
"supplementaryConfiguration": {},
}
# The 'configuration' field is also a JSON string:
config_data["configuration"] = json.dumps(config_data["configuration"])
return config_data
s3_config_query = S3ConfigQuery(s3_backends) s3_config_query = S3ConfigQuery(s3_backends)
s3_account_public_access_block_query = S3AccountPublicAccessBlockConfigQuery(
s3_backends
)

View File

@ -359,3 +359,12 @@ class InvalidPublicAccessBlockConfiguration(S3ClientError):
*args, *args,
**kwargs **kwargs
) )
class WrongPublicAccessBlockAccountIdError(S3ClientError):
code = 403
def __init__(self):
super(WrongPublicAccessBlockAccountIdError, self).__init__(
"AccessDenied", "Access Denied"
)

View File

@ -19,7 +19,7 @@ import uuid
import six import six
from bisect import insort from bisect import insort
from moto.core import BaseBackend, BaseModel from moto.core import ACCOUNT_ID, BaseBackend, BaseModel
from moto.core.utils import iso_8601_datetime_with_milliseconds, rfc_1123_datetime from moto.core.utils import iso_8601_datetime_with_milliseconds, rfc_1123_datetime
from .exceptions import ( from .exceptions import (
BucketAlreadyExists, BucketAlreadyExists,
@ -37,6 +37,7 @@ from .exceptions import (
CrossLocationLoggingProhibitted, CrossLocationLoggingProhibitted,
NoSuchPublicAccessBlockConfiguration, NoSuchPublicAccessBlockConfiguration,
InvalidPublicAccessBlockConfiguration, InvalidPublicAccessBlockConfiguration,
WrongPublicAccessBlockAccountIdError,
) )
from .utils import clean_key_name, _VersionedKeyStore from .utils import clean_key_name, _VersionedKeyStore
@ -58,6 +59,13 @@ DEFAULT_TEXT_ENCODING = sys.getdefaultencoding()
OWNER = "75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a" OWNER = "75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a"
def get_moto_s3_account_id():
"""This makes it easy for mocking AWS Account IDs when using AWS Config
-- Simply mock.patch the ACCOUNT_ID here, and Config gets it for free.
"""
return ACCOUNT_ID
class FakeDeleteMarker(BaseModel): class FakeDeleteMarker(BaseModel):
def __init__(self, key): def __init__(self, key):
self.key = key self.key = key
@ -1163,6 +1171,7 @@ class FakeBucket(BaseModel):
class S3Backend(BaseBackend): class S3Backend(BaseBackend):
def __init__(self): def __init__(self):
self.buckets = {} self.buckets = {}
self.account_public_access_block = None
def create_bucket(self, bucket_name, region_name): def create_bucket(self, bucket_name, region_name):
if bucket_name in self.buckets: if bucket_name in self.buckets:
@ -1264,6 +1273,16 @@ class S3Backend(BaseBackend):
return bucket.public_access_block return bucket.public_access_block
def get_account_public_access_block(self, account_id):
# The account ID should equal the account id that is set for Moto:
if account_id != ACCOUNT_ID:
raise WrongPublicAccessBlockAccountIdError()
if not self.account_public_access_block:
raise NoSuchPublicAccessBlockConfiguration()
return self.account_public_access_block
def set_key( def set_key(
self, bucket_name, key_name, value, storage=None, etag=None, multipart=None self, bucket_name, key_name, value, storage=None, etag=None, multipart=None
): ):
@ -1356,6 +1375,13 @@ class S3Backend(BaseBackend):
bucket = self.get_bucket(bucket_name) bucket = self.get_bucket(bucket_name)
bucket.public_access_block = None bucket.public_access_block = None
def delete_account_public_access_block(self, account_id):
# The account ID should equal the account id that is set for Moto:
if account_id != ACCOUNT_ID:
raise WrongPublicAccessBlockAccountIdError()
self.account_public_access_block = None
def put_bucket_notification_configuration(self, bucket_name, notification_config): def put_bucket_notification_configuration(self, bucket_name, notification_config):
bucket = self.get_bucket(bucket_name) bucket = self.get_bucket(bucket_name)
bucket.set_notification_configuration(notification_config) bucket.set_notification_configuration(notification_config)
@ -1384,6 +1410,21 @@ class S3Backend(BaseBackend):
pub_block_config.get("RestrictPublicBuckets"), pub_block_config.get("RestrictPublicBuckets"),
) )
def put_account_public_access_block(self, account_id, pub_block_config):
# The account ID should equal the account id that is set for Moto:
if account_id != ACCOUNT_ID:
raise WrongPublicAccessBlockAccountIdError()
if not pub_block_config:
raise InvalidPublicAccessBlockConfiguration()
self.account_public_access_block = PublicAccessBlock(
pub_block_config.get("BlockPublicAcls"),
pub_block_config.get("IgnorePublicAcls"),
pub_block_config.get("BlockPublicPolicy"),
pub_block_config.get("RestrictPublicBuckets"),
)
def initiate_multipart(self, bucket_name, key_name, metadata): def initiate_multipart(self, bucket_name, key_name, metadata):
bucket = self.get_bucket(bucket_name) bucket = self.get_bucket(bucket_name)
new_multipart = FakeMultipart(key_name, metadata) new_multipart = FakeMultipart(key_name, metadata)

View File

@ -4,6 +4,7 @@ import re
import sys import sys
import six import six
from botocore.awsrequest import AWSPreparedRequest
from moto.core.utils import str_to_rfc_1123_datetime, py2_strip_unicode_keys from moto.core.utils import str_to_rfc_1123_datetime, py2_strip_unicode_keys
from six.moves.urllib.parse import parse_qs, urlparse, unquote from six.moves.urllib.parse import parse_qs, urlparse, unquote
@ -123,6 +124,11 @@ ACTION_MAP = {
"uploadId": "PutObject", "uploadId": "PutObject",
}, },
}, },
"CONTROL": {
"GET": {"publicAccessBlock": "GetPublicAccessBlock"},
"PUT": {"publicAccessBlock": "PutPublicAccessBlock"},
"DELETE": {"publicAccessBlock": "DeletePublicAccessBlock"},
},
} }
@ -220,7 +226,7 @@ class ResponseObject(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
# Depending on which calling format the client is using, we don't know # Depending on which calling format the client is using, we don't know
# if this is a bucket or key request so we have to check # if this is a bucket or key request so we have to check
if self.subdomain_based_buckets(request): if self.subdomain_based_buckets(request):
return self.key_response(request, full_url, headers) return self.key_or_control_response(request, full_url, headers)
else: else:
# Using path-based buckets # Using path-based buckets
return self.bucket_response(request, full_url, headers) return self.bucket_response(request, full_url, headers)
@ -287,7 +293,7 @@ class ResponseObject(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
return self._bucket_response_post(request, body, bucket_name) return self._bucket_response_post(request, body, bucket_name)
else: else:
raise NotImplementedError( raise NotImplementedError(
"Method {0} has not been impelemented in the S3 backend yet".format( "Method {0} has not been implemented in the S3 backend yet".format(
method method
) )
) )
@ -595,6 +601,20 @@ class ResponseObject(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
pass pass
return False return False
def _parse_pab_config(self, body):
parsed_xml = xmltodict.parse(body)
parsed_xml["PublicAccessBlockConfiguration"].pop("@xmlns", None)
# If Python 2, fix the unicode strings:
if sys.version_info[0] < 3:
parsed_xml = {
"PublicAccessBlockConfiguration": py2_strip_unicode_keys(
dict(parsed_xml["PublicAccessBlockConfiguration"])
)
}
return parsed_xml
def _bucket_response_put( def _bucket_response_put(
self, request, body, region_name, bucket_name, querystring self, request, body, region_name, bucket_name, querystring
): ):
@ -673,19 +693,9 @@ class ResponseObject(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
raise e raise e
elif "publicAccessBlock" in querystring: elif "publicAccessBlock" in querystring:
parsed_xml = xmltodict.parse(body) pab_config = self._parse_pab_config(body)
parsed_xml["PublicAccessBlockConfiguration"].pop("@xmlns", None)
# If Python 2, fix the unicode strings:
if sys.version_info[0] < 3:
parsed_xml = {
"PublicAccessBlockConfiguration": py2_strip_unicode_keys(
dict(parsed_xml["PublicAccessBlockConfiguration"])
)
}
self.backend.put_bucket_public_access_block( self.backend.put_bucket_public_access_block(
bucket_name, parsed_xml["PublicAccessBlockConfiguration"] bucket_name, pab_config["PublicAccessBlockConfiguration"]
) )
return "" return ""
@ -870,14 +880,20 @@ class ResponseObject(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
) )
return 206, response_headers, response_content[begin : end + 1] return 206, response_headers, response_content[begin : end + 1]
def key_response(self, request, full_url, headers): def key_or_control_response(self, request, full_url, headers):
# Key and Control are lumped in because splitting out the regex is too much of a pain :/
self.method = request.method self.method = request.method
self.path = self._get_path(request) self.path = self._get_path(request)
self.headers = request.headers self.headers = request.headers
if "host" not in self.headers: if "host" not in self.headers:
self.headers["host"] = urlparse(full_url).netloc self.headers["host"] = urlparse(full_url).netloc
response_headers = {} response_headers = {}
try: try:
# Is this an S3 control response?
if isinstance(request, AWSPreparedRequest) and "s3-control" in request.url:
response = self._control_response(request, full_url, headers)
else:
response = self._key_response(request, full_url, headers) response = self._key_response(request, full_url, headers)
except S3ClientError as s3error: except S3ClientError as s3error:
response = s3error.code, {}, s3error.description response = s3error.code, {}, s3error.description
@ -894,6 +910,94 @@ class ResponseObject(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
) )
return status_code, response_headers, response_content return status_code, response_headers, response_content
def _control_response(self, request, full_url, headers):
parsed_url = urlparse(full_url)
query = parse_qs(parsed_url.query, keep_blank_values=True)
method = request.method
if hasattr(request, "body"):
# Boto
body = request.body
if hasattr(body, "read"):
body = body.read()
else:
# Flask server
body = request.data
if body is None:
body = b""
if method == "GET":
return self._control_response_get(request, query, headers)
elif method == "PUT":
return self._control_response_put(request, body, query, headers)
elif method == "DELETE":
return self._control_response_delete(request, query, headers)
else:
raise NotImplementedError(
"Method {0} has not been implemented in the S3 backend yet".format(
method
)
)
def _control_response_get(self, request, query, headers):
action = self.path.split("?")[0].split("/")[
-1
] # Gets the action out of the URL sans query params.
self._set_action("CONTROL", "GET", action)
self._authenticate_and_authorize_s3_action()
response_headers = {}
if "publicAccessBlock" in action:
public_block_config = self.backend.get_account_public_access_block(
headers["x-amz-account-id"]
)
template = self.response_template(S3_PUBLIC_ACCESS_BLOCK_CONFIGURATION)
return (
200,
response_headers,
template.render(public_block_config=public_block_config),
)
raise NotImplementedError(
"Method {0} has not been implemented in the S3 backend yet".format(action)
)
def _control_response_put(self, request, body, query, headers):
action = self.path.split("?")[0].split("/")[
-1
] # Gets the action out of the URL sans query params.
self._set_action("CONTROL", "PUT", action)
self._authenticate_and_authorize_s3_action()
response_headers = {}
if "publicAccessBlock" in action:
pab_config = self._parse_pab_config(body)
self.backend.put_account_public_access_block(
headers["x-amz-account-id"],
pab_config["PublicAccessBlockConfiguration"],
)
return 200, response_headers, ""
raise NotImplementedError(
"Method {0} has not been implemented in the S3 backend yet".format(action)
)
def _control_response_delete(self, request, query, headers):
action = self.path.split("?")[0].split("/")[
-1
] # Gets the action out of the URL sans query params.
self._set_action("CONTROL", "DELETE", action)
self._authenticate_and_authorize_s3_action()
response_headers = {}
if "publicAccessBlock" in action:
self.backend.delete_account_public_access_block(headers["x-amz-account-id"])
return 200, response_headers, ""
raise NotImplementedError(
"Method {0} has not been implemented in the S3 backend yet".format(action)
)
def _key_response(self, request, full_url, headers): def _key_response(self, request, full_url, headers):
parsed_url = urlparse(full_url) parsed_url = urlparse(full_url)
query = parse_qs(parsed_url.query, keep_blank_values=True) query = parse_qs(parsed_url.query, keep_blank_values=True)

View File

@ -13,7 +13,7 @@ url_paths = {
# subdomain key of path-based bucket # subdomain key of path-based bucket
"{0}/(?P<key_or_bucket_name>[^/]+)/?$": S3ResponseInstance.ambiguous_response, "{0}/(?P<key_or_bucket_name>[^/]+)/?$": S3ResponseInstance.ambiguous_response,
# path-based bucket + key # path-based bucket + key
"{0}/(?P<bucket_name_path>[^/]+)/(?P<key_name>.+)": S3ResponseInstance.key_response, "{0}/(?P<bucket_name_path>[^/]+)/(?P<key_name>.+)": S3ResponseInstance.key_or_control_response,
# subdomain bucket + key with empty first part of path # subdomain bucket + key with empty first part of path
"{0}//(?P<key_name>.*)$": S3ResponseInstance.key_response, "{0}//(?P<key_name>.*)$": S3ResponseInstance.key_or_control_response,
} }

View File

@ -150,7 +150,7 @@ def test_invoke_requestresponse_function_with_arn():
Payload=json.dumps(in_data), Payload=json.dumps(in_data),
) )
success_result["StatusCode"].should.equal(202) success_result["StatusCode"].should.equal(200)
result_obj = json.loads( result_obj = json.loads(
base64.b64decode(success_result["LogResult"]).decode("utf-8") base64.b64decode(success_result["LogResult"]).decode("utf-8")
) )

View File

@ -46,4 +46,4 @@ def test_domain_dispatched_with_service():
dispatcher = DomainDispatcherApplication(create_backend_app, service="s3") dispatcher = DomainDispatcherApplication(create_backend_app, service="s3")
backend_app = dispatcher.get_application({"HTTP_HOST": "s3.us-east1.amazonaws.com"}) backend_app = dispatcher.get_application({"HTTP_HOST": "s3.us-east1.amazonaws.com"})
keys = set(backend_app.view_functions.keys()) keys = set(backend_app.view_functions.keys())
keys.should.contain("ResponseObject.key_response") keys.should.contain("ResponseObject.key_or_control_response")

View File

@ -5,6 +5,7 @@ import datetime
import os import os
import sys import sys
from boto3 import Session
from six.moves.urllib.request import urlopen from six.moves.urllib.request import urlopen
from six.moves.urllib.error import HTTPError from six.moves.urllib.error import HTTPError
from functools import wraps from functools import wraps
@ -1135,6 +1136,380 @@ if not settings.TEST_SERVER_MODE:
"The unspecified location constraint is incompatible for the region specific endpoint this request was sent to." "The unspecified location constraint is incompatible for the region specific endpoint this request was sent to."
) )
# All tests for s3-control cannot be run under the server without a modification of the
# hosts file on your system. This is due to the fact that the URL to the host is in the form of:
# ACCOUNT_ID.s3-control.amazonaws.com <-- That Account ID part is the problem. If you want to
# make use of the moto server, update your hosts file for `THE_ACCOUNT_ID_FOR_MOTO.localhost`
# and this will work fine.
@mock_s3
def test_get_public_access_block_for_account():
from moto.s3.models import ACCOUNT_ID
client = boto3.client("s3control", region_name="us-west-2")
# With an invalid account ID:
with assert_raises(ClientError) as ce:
client.get_public_access_block(AccountId="111111111111")
assert ce.exception.response["Error"]["Code"] == "AccessDenied"
# Without one defined:
with assert_raises(ClientError) as ce:
client.get_public_access_block(AccountId=ACCOUNT_ID)
assert (
ce.exception.response["Error"]["Code"]
== "NoSuchPublicAccessBlockConfiguration"
)
# Put a with an invalid account ID:
with assert_raises(ClientError) as ce:
client.put_public_access_block(
AccountId="111111111111",
PublicAccessBlockConfiguration={"BlockPublicAcls": True},
)
assert ce.exception.response["Error"]["Code"] == "AccessDenied"
# Put with an invalid PAB:
with assert_raises(ClientError) as ce:
client.put_public_access_block(
AccountId=ACCOUNT_ID, PublicAccessBlockConfiguration={}
)
assert ce.exception.response["Error"]["Code"] == "InvalidRequest"
assert (
"Must specify at least one configuration."
in ce.exception.response["Error"]["Message"]
)
# Correct PAB:
client.put_public_access_block(
AccountId=ACCOUNT_ID,
PublicAccessBlockConfiguration={
"BlockPublicAcls": True,
"IgnorePublicAcls": True,
"BlockPublicPolicy": True,
"RestrictPublicBuckets": True,
},
)
# Get the correct PAB (for all regions):
for region in Session().get_available_regions("s3control"):
region_client = boto3.client("s3control", region_name=region)
assert region_client.get_public_access_block(AccountId=ACCOUNT_ID)[
"PublicAccessBlockConfiguration"
] == {
"BlockPublicAcls": True,
"IgnorePublicAcls": True,
"BlockPublicPolicy": True,
"RestrictPublicBuckets": True,
}
# Delete with an invalid account ID:
with assert_raises(ClientError) as ce:
client.delete_public_access_block(AccountId="111111111111")
assert ce.exception.response["Error"]["Code"] == "AccessDenied"
# Delete successfully:
client.delete_public_access_block(AccountId=ACCOUNT_ID)
# Confirm that it's deleted:
with assert_raises(ClientError) as ce:
client.get_public_access_block(AccountId=ACCOUNT_ID)
assert (
ce.exception.response["Error"]["Code"]
== "NoSuchPublicAccessBlockConfiguration"
)
@mock_s3
@mock_config
def test_config_list_account_pab():
from moto.s3.models import ACCOUNT_ID
client = boto3.client("s3control", region_name="us-west-2")
config_client = boto3.client("config", region_name="us-west-2")
# Create the aggregator:
account_aggregation_source = {
"AccountIds": [ACCOUNT_ID],
"AllAwsRegions": True,
}
config_client.put_configuration_aggregator(
ConfigurationAggregatorName="testing",
AccountAggregationSources=[account_aggregation_source],
)
# Without a PAB in place:
result = config_client.list_discovered_resources(
resourceType="AWS::S3::AccountPublicAccessBlock"
)
assert not result["resourceIdentifiers"]
result = config_client.list_aggregate_discovered_resources(
ResourceType="AWS::S3::AccountPublicAccessBlock",
ConfigurationAggregatorName="testing",
)
assert not result["ResourceIdentifiers"]
# Create a PAB:
client.put_public_access_block(
AccountId=ACCOUNT_ID,
PublicAccessBlockConfiguration={
"BlockPublicAcls": True,
"IgnorePublicAcls": True,
"BlockPublicPolicy": True,
"RestrictPublicBuckets": True,
},
)
# Test that successful queries work (non-aggregated):
result = config_client.list_discovered_resources(
resourceType="AWS::S3::AccountPublicAccessBlock"
)
assert result["resourceIdentifiers"] == [
{
"resourceType": "AWS::S3::AccountPublicAccessBlock",
"resourceId": ACCOUNT_ID,
}
]
result = config_client.list_discovered_resources(
resourceType="AWS::S3::AccountPublicAccessBlock",
resourceIds=[ACCOUNT_ID, "nope"],
)
assert result["resourceIdentifiers"] == [
{
"resourceType": "AWS::S3::AccountPublicAccessBlock",
"resourceId": ACCOUNT_ID,
}
]
result = config_client.list_discovered_resources(
resourceType="AWS::S3::AccountPublicAccessBlock", resourceName=""
)
assert result["resourceIdentifiers"] == [
{
"resourceType": "AWS::S3::AccountPublicAccessBlock",
"resourceId": ACCOUNT_ID,
}
]
# Test that successful queries work (aggregated):
result = config_client.list_aggregate_discovered_resources(
ResourceType="AWS::S3::AccountPublicAccessBlock",
ConfigurationAggregatorName="testing",
)
regions = {region for region in Session().get_available_regions("config")}
for r in result["ResourceIdentifiers"]:
regions.remove(r.pop("SourceRegion"))
assert r == {
"ResourceType": "AWS::S3::AccountPublicAccessBlock",
"SourceAccountId": ACCOUNT_ID,
"ResourceId": ACCOUNT_ID,
}
# Just check that the len is the same -- this should be reasonable
regions = {region for region in Session().get_available_regions("config")}
result = config_client.list_aggregate_discovered_resources(
ResourceType="AWS::S3::AccountPublicAccessBlock",
ConfigurationAggregatorName="testing",
Filters={"ResourceName": ""},
)
assert len(regions) == len(result["ResourceIdentifiers"])
result = config_client.list_aggregate_discovered_resources(
ResourceType="AWS::S3::AccountPublicAccessBlock",
ConfigurationAggregatorName="testing",
Filters={"ResourceName": "", "ResourceId": ACCOUNT_ID},
)
assert len(regions) == len(result["ResourceIdentifiers"])
result = config_client.list_aggregate_discovered_resources(
ResourceType="AWS::S3::AccountPublicAccessBlock",
ConfigurationAggregatorName="testing",
Filters={
"ResourceName": "",
"ResourceId": ACCOUNT_ID,
"Region": "us-west-2",
},
)
assert (
result["ResourceIdentifiers"][0]["SourceRegion"] == "us-west-2"
and len(result["ResourceIdentifiers"]) == 1
)
# Test aggregator pagination:
result = config_client.list_aggregate_discovered_resources(
ResourceType="AWS::S3::AccountPublicAccessBlock",
ConfigurationAggregatorName="testing",
Limit=1,
)
regions = sorted(
[region for region in Session().get_available_regions("config")]
)
assert result["ResourceIdentifiers"][0] == {
"ResourceType": "AWS::S3::AccountPublicAccessBlock",
"SourceAccountId": ACCOUNT_ID,
"ResourceId": ACCOUNT_ID,
"SourceRegion": regions[0],
}
assert result["NextToken"] == regions[1]
# Get the next region:
result = config_client.list_aggregate_discovered_resources(
ResourceType="AWS::S3::AccountPublicAccessBlock",
ConfigurationAggregatorName="testing",
Limit=1,
NextToken=regions[1],
)
assert result["ResourceIdentifiers"][0] == {
"ResourceType": "AWS::S3::AccountPublicAccessBlock",
"SourceAccountId": ACCOUNT_ID,
"ResourceId": ACCOUNT_ID,
"SourceRegion": regions[1],
}
# Non-aggregated with incorrect info:
result = config_client.list_discovered_resources(
resourceType="AWS::S3::AccountPublicAccessBlock", resourceName="nope"
)
assert not result["resourceIdentifiers"]
result = config_client.list_discovered_resources(
resourceType="AWS::S3::AccountPublicAccessBlock", resourceIds=["nope"]
)
assert not result["resourceIdentifiers"]
# Aggregated with incorrect info:
result = config_client.list_aggregate_discovered_resources(
ResourceType="AWS::S3::AccountPublicAccessBlock",
ConfigurationAggregatorName="testing",
Filters={"ResourceName": "nope"},
)
assert not result["ResourceIdentifiers"]
result = config_client.list_aggregate_discovered_resources(
ResourceType="AWS::S3::AccountPublicAccessBlock",
ConfigurationAggregatorName="testing",
Filters={"ResourceId": "nope"},
)
assert not result["ResourceIdentifiers"]
result = config_client.list_aggregate_discovered_resources(
ResourceType="AWS::S3::AccountPublicAccessBlock",
ConfigurationAggregatorName="testing",
Filters={"Region": "Nope"},
)
assert not result["ResourceIdentifiers"]
@mock_s3
@mock_config
def test_config_get_account_pab():
from moto.s3.models import ACCOUNT_ID
client = boto3.client("s3control", region_name="us-west-2")
config_client = boto3.client("config", region_name="us-west-2")
# Create the aggregator:
account_aggregation_source = {
"AccountIds": [ACCOUNT_ID],
"AllAwsRegions": True,
}
config_client.put_configuration_aggregator(
ConfigurationAggregatorName="testing",
AccountAggregationSources=[account_aggregation_source],
)
# Without a PAB in place:
with assert_raises(ClientError) as ce:
config_client.get_resource_config_history(
resourceType="AWS::S3::AccountPublicAccessBlock", resourceId=ACCOUNT_ID
)
assert (
ce.exception.response["Error"]["Code"] == "ResourceNotDiscoveredException"
)
# aggregate
result = config_client.batch_get_resource_config(
resourceKeys=[
{
"resourceType": "AWS::S3::AccountPublicAccessBlock",
"resourceId": "ACCOUNT_ID",
}
]
)
assert not result["baseConfigurationItems"]
result = config_client.batch_get_aggregate_resource_config(
ConfigurationAggregatorName="testing",
ResourceIdentifiers=[
{
"SourceAccountId": ACCOUNT_ID,
"SourceRegion": "us-west-2",
"ResourceId": ACCOUNT_ID,
"ResourceType": "AWS::S3::AccountPublicAccessBlock",
"ResourceName": "",
}
],
)
assert not result["BaseConfigurationItems"]
# Create a PAB:
client.put_public_access_block(
AccountId=ACCOUNT_ID,
PublicAccessBlockConfiguration={
"BlockPublicAcls": True,
"IgnorePublicAcls": True,
"BlockPublicPolicy": True,
"RestrictPublicBuckets": True,
},
)
# Get the proper config:
proper_config = {
"blockPublicAcls": True,
"ignorePublicAcls": True,
"blockPublicPolicy": True,
"restrictPublicBuckets": True,
}
result = config_client.get_resource_config_history(
resourceType="AWS::S3::AccountPublicAccessBlock", resourceId=ACCOUNT_ID
)
assert (
json.loads(result["configurationItems"][0]["configuration"])
== proper_config
)
assert (
result["configurationItems"][0]["accountId"]
== result["configurationItems"][0]["resourceId"]
== ACCOUNT_ID
)
result = config_client.batch_get_resource_config(
resourceKeys=[
{
"resourceType": "AWS::S3::AccountPublicAccessBlock",
"resourceId": ACCOUNT_ID,
}
]
)
assert len(result["baseConfigurationItems"]) == 1
assert (
json.loads(result["baseConfigurationItems"][0]["configuration"])
== proper_config
)
assert (
result["baseConfigurationItems"][0]["accountId"]
== result["baseConfigurationItems"][0]["resourceId"]
== ACCOUNT_ID
)
for region in Session().get_available_regions("s3control"):
result = config_client.batch_get_aggregate_resource_config(
ConfigurationAggregatorName="testing",
ResourceIdentifiers=[
{
"SourceAccountId": ACCOUNT_ID,
"SourceRegion": region,
"ResourceId": ACCOUNT_ID,
"ResourceType": "AWS::S3::AccountPublicAccessBlock",
"ResourceName": "",
}
],
)
assert len(result["BaseConfigurationItems"]) == 1
assert (
json.loads(result["BaseConfigurationItems"][0]["configuration"])
== proper_config
)
@mock_s3_deprecated @mock_s3_deprecated
def test_ranged_get(): def test_ranged_get():