moto/moto/s3/config.py

271 lines
8.9 KiB
Python
Raw Normal View History

import datetime
import json
import time
from boto3 import Session
from moto.core.exceptions import InvalidNextTokenException
from moto.core.models import ConfigQueryModel
from moto.s3 import s3_backends
from moto.s3.models import get_moto_s3_account_id
class S3ConfigQuery(ConfigQueryModel):
2019-10-31 08:44:26 -07:00
def list_config_service_resources(
self,
resource_ids,
resource_name,
limit,
next_token,
backend_region=None,
resource_region=None,
2020-08-07 22:34:59 -06:00
aggregator=None,
2019-10-31 08:44:26 -07:00
):
# The resource_region only matters for aggregated queries as you can filter on bucket regions for them.
# For other resource types, you would need to iterate appropriately for the backend_region.
# Resource IDs are the same as S3 bucket names
# For aggregation -- did we get both a resource ID and a resource name?
if resource_ids and resource_name:
# If the values are different, then return an empty list:
if resource_name not in resource_ids:
return [], None
# If no filter was passed in for resource names/ids then return them all:
if not resource_ids and not resource_name:
2019-10-31 08:44:26 -07:00
bucket_list = list(self.backends["global"].buckets.keys())
else:
# Match the resource name / ID:
bucket_list = []
filter_buckets = [resource_name] if resource_name else resource_ids
2019-10-31 08:44:26 -07:00
for bucket in self.backends["global"].buckets.keys():
if bucket in filter_buckets:
bucket_list.append(bucket)
# Filter on the proper region if supplied:
region_filter = backend_region or resource_region
if region_filter:
region_buckets = []
for bucket in bucket_list:
2019-10-31 08:44:26 -07:00
if self.backends["global"].buckets[bucket].region_name == region_filter:
region_buckets.append(bucket)
bucket_list = region_buckets
if not bucket_list:
return [], None
# Pagination logic:
sorted_buckets = sorted(bucket_list)
new_token = None
# Get the start:
if not next_token:
start = 0
else:
# Tokens for this moto feature is just the bucket name:
# For OTHER non-global resource types, it's the region concatenated with the resource ID.
if next_token not in sorted_buckets:
raise InvalidNextTokenException()
start = sorted_buckets.index(next_token)
# Get the list of items to collect:
2019-10-31 08:44:26 -07:00
bucket_list = sorted_buckets[start : (start + limit)]
if len(sorted_buckets) > (start + limit):
new_token = sorted_buckets[start + limit]
2019-10-31 08:44:26 -07:00
return (
[
{
"type": "AWS::S3::Bucket",
"id": bucket,
"name": bucket,
"region": self.backends["global"].buckets[bucket].region_name,
}
for bucket in bucket_list
],
new_token,
)
def get_config_resource(
self, resource_id, resource_name=None, backend_region=None, resource_region=None
):
# Get the bucket:
2019-10-31 08:44:26 -07:00
bucket = self.backends["global"].buckets.get(resource_id, {})
if not bucket:
return
# Are we filtering based on region?
region_filter = backend_region or resource_region
if region_filter and bucket.region_name != region_filter:
return
# Are we also filtering on bucket name?
if resource_name and bucket.name != resource_name:
return
# Format the bucket to the AWS Config format:
config_data = bucket.to_config_dict()
# The 'configuration' field is also a JSON string:
2019-10-31 08:44:26 -07:00
config_data["configuration"] = json.dumps(config_data["configuration"])
# Supplementary config need all values converted to JSON strings if they are not strings already:
2019-10-31 08:44:26 -07:00
for field, value in config_data["supplementaryConfiguration"].items():
if not isinstance(value, str):
2019-10-31 08:44:26 -07:00
config_data["supplementaryConfiguration"][field] = json.dumps(value)
return config_data
class S3AccountPublicAccessBlockConfigQuery(ConfigQueryModel):
def list_config_service_resources(
self,
resource_ids,
resource_name,
limit,
next_token,
backend_region=None,
resource_region=None,
2020-08-07 22:34:59 -06:00
aggregator=None,
):
# For the Account Public Access Block, they are the same for all regions. The resource ID is the AWS account ID
# There is no resource name -- it should be a blank string "" if provided.
# The resource name can only ever be None or an empty string:
if resource_name is not None and resource_name != "":
return [], None
pab = None
account_id = get_moto_s3_account_id()
regions = [region for region in Session().get_available_regions("config")]
# If a resource ID was passed in, then filter accordingly:
if resource_ids:
for id in resource_ids:
if account_id == id:
pab = self.backends["global"].account_public_access_block
break
# Otherwise, just grab the one from the backend:
if not resource_ids:
pab = self.backends["global"].account_public_access_block
# If it's not present, then return nothing
if not pab:
return [], None
# Filter on regions (and paginate on them as well):
if backend_region:
pab_list = [backend_region]
elif resource_region:
# Invalid region?
if resource_region not in regions:
return [], None
pab_list = [resource_region]
# Aggregated query where no regions were supplied so return them all:
else:
pab_list = regions
# Pagination logic:
sorted_regions = sorted(pab_list)
new_token = None
# Get the start:
if not next_token:
start = 0
else:
# Tokens for this moto feature is just the region-name:
# For OTHER non-global resource types, it's the region concatenated with the resource ID.
if next_token not in sorted_regions:
raise InvalidNextTokenException()
start = sorted_regions.index(next_token)
# Get the list of items to collect:
pab_list = sorted_regions[start : (start + limit)]
if len(sorted_regions) > (start + limit):
new_token = sorted_regions[start + limit]
return (
[
{
"type": "AWS::S3::AccountPublicAccessBlock",
"id": account_id,
"region": region,
}
for region in pab_list
],
new_token,
)
def get_config_resource(
self, resource_id, resource_name=None, backend_region=None, resource_region=None
):
# Do we even have this defined?
if not self.backends["global"].account_public_access_block:
return None
# Resource name can only ever be "" if it's supplied:
if resource_name is not None and resource_name != "":
return None
# Are we filtering based on region?
account_id = get_moto_s3_account_id()
regions = [region for region in Session().get_available_regions("config")]
# Is the resource ID correct?:
if account_id == resource_id:
if backend_region:
pab_region = backend_region
# Invalid region?
elif resource_region not in regions:
return None
else:
pab_region = resource_region
else:
return None
# Format the PAB to the AWS Config format:
creation_time = datetime.datetime.utcnow()
config_data = {
"version": "1.3",
"accountId": account_id,
"configurationItemCaptureTime": str(creation_time),
"configurationItemStatus": "OK",
"configurationStateId": str(
int(time.mktime(creation_time.timetuple()))
), # PY2 and 3 compatible
"resourceType": "AWS::S3::AccountPublicAccessBlock",
"resourceId": account_id,
"awsRegion": pab_region,
"availabilityZone": "Not Applicable",
"configuration": self.backends[
"global"
].account_public_access_block.to_config_dict(),
"supplementaryConfiguration": {},
}
# The 'configuration' field is also a JSON string:
config_data["configuration"] = json.dumps(config_data["configuration"])
return config_data
s3_config_query = S3ConfigQuery(s3_backends)
s3_account_public_access_block_query = S3AccountPublicAccessBlockConfigQuery(
s3_backends
)