Adding support for querying the AWS Config resource for S3.

- Need to add support still for batch requests and batch aggregation requests in a future PR
This commit is contained in:
Mike Grima 2019-10-03 18:00:07 -07:00
parent e71c06738c
commit 2a2c3e80f0
9 changed files with 746 additions and 23 deletions

View File

@ -87,7 +87,7 @@ that look like this:
]
```
It's recommended to read the comment for the `ConfigQueryModel` [base class here](moto/core/models.py).
It's recommended to read the comment for the `ConfigQueryModel`'s `list_config_service_resources` function in [base class here](moto/core/models.py).
^^ The AWS Config code will see this and format it correct for both aggregated and non-aggregated calls.
@ -102,6 +102,19 @@ An example of a working implementation of this is [S3](moto/s3/config.py).
Pagination should generally be able to pull out the resource across any region so should be sharded by `region-item-name` -- not done for S3
because S3 has a globally unique name space.
### Describing Resources
TODO: Need to fill this in when it's implemented
Fetching a resource's configuration has some similarities to listing resources, but it requires more work (to implement). Due to the
various ways that a resource can be configured, some work will need to be done to ensure that the Config dict returned is correct.
For most resource types the following is true:
1. There are regional backends with their own sets of data
1. Config aggregation can pull data from any backend region -- we assume that everything lives in the same account
The current implementation is for S3. S3 is very complex and depending on how the bucket is configured will depend on what Config will
return for it.
When implementing resource config fetching, you will need to return at a minimum `None` if the resource is not found, or a `dict` that looks
like what AWS Config would return.
It's recommended to read the comment for the `ConfigQueryModel` 's `get_config_resource` function in [base class here](moto/core/models.py).

View File

@ -254,3 +254,25 @@ class TooManyResourceIds(JsonRESTError):
def __init__(self):
super(TooManyResourceIds, self).__init__('ValidationException', "The specified list had more than 20 resource ID's. "
"It must have '20' or less items")
class ResourceNotDiscoveredException(JsonRESTError):
code = 400
def __init__(self, type, resource):
super(ResourceNotDiscoveredException, self).__init__('ResourceNotDiscoveredException',
'Resource {resource} of resourceType:{type} is unknown or has not been '
'discovered'.format(resource=resource, type=type))
class TooManyResourceKeys(JsonRESTError):
code = 400
def __init__(self, bad_list):
message = '1 validation error detected: Value \'{bad_list}\' at ' \
'\'resourceKeys\' failed to satisfy constraint: ' \
'Member must have length less than or equal to 100'.format(bad_list=bad_list)
# For PY2:
message = str(message)
super(TooManyResourceKeys, self).__init__("ValidationException", message)

View File

@ -17,7 +17,8 @@ from moto.config.exceptions import InvalidResourceTypeException, InvalidDelivery
InvalidSNSTopicARNException, MaxNumberOfDeliveryChannelsExceededException, NoAvailableDeliveryChannelException, \
NoSuchDeliveryChannelException, LastDeliveryChannelDeleteFailedException, TagKeyTooBig, \
TooManyTags, TagValueTooBig, TooManyAccountSources, InvalidParameterValueException, InvalidNextTokenException, \
NoSuchConfigurationAggregatorException, InvalidTagCharacters, DuplicateTags, InvalidLimit, InvalidResourceParameters, TooManyResourceIds
NoSuchConfigurationAggregatorException, InvalidTagCharacters, DuplicateTags, InvalidLimit, InvalidResourceParameters, \
TooManyResourceIds, ResourceNotDiscoveredException
from moto.core import BaseBackend, BaseModel
from moto.s3.config import s3_config_query
@ -790,6 +791,39 @@ class ConfigBackend(BaseBackend):
return result
def get_resource_config_history(self, type, id, backend_region):
"""Returns the configuration of an item in the AWS Config format of the resource for the current regional backend.
NOTE: This is --NOT-- returning history as it is not supported in moto at this time. (PR's welcome!)
As such, the later_time, earlier_time, limit, and next_token are ignored as this will only
return 1 item. (If no items, it raises an exception)
"""
# If the type isn't implemented then we won't find the item:
if type not in RESOURCE_MAP:
raise ResourceNotDiscoveredException(type, id)
# Is the resource type global?
if RESOURCE_MAP[type].backends.get('global'):
backend_region = 'global'
# If the backend region isn't implemented then we won't find the item:
if not RESOURCE_MAP[type].backends.get(backend_region):
raise ResourceNotDiscoveredException(type, id)
# Get the item:
item = RESOURCE_MAP[type].get_config_resource(id, backend_region=backend_region)
if not item:
raise ResourceNotDiscoveredException(type, id)
item['accountId'] = DEFAULT_ACCOUNT_ID
return {'configurationItems': [item]}
def batch_get_resource_config(self, resource_keys, backend_region):
"""Returns the configuration of an item in the AWS Config format of the resource for the current regional backend."""
# Can't have more than 100 items
pass
config_backends = {}
boto3_session = Session()

View File

@ -102,6 +102,12 @@ class ConfigResponse(BaseResponse):
self._get_param('NextToken'))
return json.dumps(schema)
def get_resource_config_history(self):
schema = self.config_backend.get_resource_config_history(self._get_param('resourceType'),
self._get_param('resourceId'),
self.region)
return json.dumps(schema)
"""
def batch_get_resource_config(self):
# TODO implement me!
@ -110,8 +116,4 @@ class ConfigResponse(BaseResponse):
def batch_get_aggregate_resource_config(self):
# TODO implement me!
return ""
def get_resource_config_history(self):
# TODO implement me!
return ""
"""

View File

@ -554,7 +554,7 @@ class ConfigQueryModel(object):
This supports both aggregated and non-aggregated listing. The following notes the difference:
- Non Aggregated Listing -
- Non-Aggregated Listing -
This only lists resources within a region. The way that this is implemented in moto is based on the region
for the resource backend.
@ -593,8 +593,31 @@ class ConfigQueryModel(object):
"""
raise NotImplementedError()
def get_config_resource(self):
"""TODO implement me."""
def get_config_resource(self, resource_id, resource_name=None, backend_region=None, resource_region=None):
"""For AWS Config. This will query the backend for the specific resource type configuration.
This supports both aggregated, and non-aggregated fetching -- for batched fetching -- the Config batching requests
will call this function N times to fetch the N objects needing to be fetched.
- Non-Aggregated Fetching -
This only fetches a resource config within a region. The way that this is implemented in moto is based on the region
for the resource backend.
You must set the `backend_region` to the region that the API request arrived from. `resource_region` should be set to `None`.
- Aggregated Fetching -
This fetches resources from all potential regional backends. For non-global resource types, this should collect a full
list of resources from all the backends, and then be able to filter from the resource region. This is because an
aggregator can aggregate resources from multiple regions. In moto, aggregated regions will *assume full aggregation
from all resources in all regions for a given resource type*.
...
:param resource_id:
:param resource_name:
:param backend_region:
:param resource_region:
:return:
"""
raise NotImplementedError()

View File

@ -1,3 +1,5 @@
import json
from moto.core.exceptions import InvalidNextTokenException
from moto.core.models import ConfigQueryModel
from moto.s3 import s3_backends
@ -66,5 +68,35 @@ class S3ConfigQuery(ConfigQueryModel):
return [{'type': 'AWS::S3::Bucket', 'id': bucket, 'name': bucket, 'region': self.backends['global'].buckets[bucket].region_name}
for bucket in bucket_list], new_token
def get_config_resource(self, resource_id, resource_name=None, backend_region=None, resource_region=None):
# backend_region is ignored for S3 as the backend is 'global'
# Get the bucket:
bucket = self.backends['global'].buckets.get(resource_id, {})
if not bucket:
return
# Are we filtering based on region?
if resource_region and bucket.region_name != resource_region:
return
# Are we also filtering on bucket name?
if resource_name and bucket.name != resource_name:
return
# Format the bucket to the AWS Config format:
config_data = bucket.to_config_dict()
# The 'configuration' field is also a JSON string:
config_data['configuration'] = json.dumps(config_data['configuration'])
# Supplementary config need all values converted to JSON strings if they are not strings already:
for field, value in config_data['supplementaryConfiguration'].items():
if not isinstance(value, str):
config_data['supplementaryConfiguration'][field] = json.dumps(value)
return config_data
s3_config_query = S3ConfigQuery(s3_backends)

View File

@ -1,4 +1,6 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
import os
import base64
import datetime
@ -10,6 +12,7 @@ import random
import string
import tempfile
import sys
import time
import uuid
import six
@ -32,6 +35,7 @@ STORAGE_CLASS = ["STANDARD", "REDUCED_REDUNDANCY", "STANDARD_IA", "ONEZONE_IA",
"INTELLIGENT_TIERING", "GLACIER", "DEEP_ARCHIVE"]
DEFAULT_KEY_BUFFER_SIZE = 16 * 1024 * 1024
DEFAULT_TEXT_ENCODING = sys.getdefaultencoding()
OWNER = '75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a'
class FakeDeleteMarker(BaseModel):
@ -316,6 +320,14 @@ PERMISSION_READ = 'READ'
PERMISSION_WRITE_ACP = 'WRITE_ACP'
PERMISSION_READ_ACP = 'READ_ACP'
CAMEL_CASED_PERMISSIONS = {
'FULL_CONTROL': 'FullControl',
'WRITE': 'Write',
'READ': 'Read',
'WRITE_ACP': 'WriteAcp',
'READ_ACP': 'ReadAcp'
}
class FakeGrant(BaseModel):
@ -346,10 +358,43 @@ class FakeAcl(BaseModel):
def __repr__(self):
return "FakeAcl(grants: {})".format(self.grants)
def to_config_dict(self):
"""Returns the object into the format expected by AWS Config"""
data = {
'grantSet': None, # Always setting this to None. Feel free to change.
'owner': {'displayName': None, 'id': OWNER}
}
# Add details for each Grant:
grant_list = []
for grant in self.grants:
permissions = grant.permissions if isinstance(grant.permissions, list) else [grant.permissions]
for permission in permissions:
for grantee in grant.grantees:
# Config does not add the owner if its permissions are FULL_CONTROL:
if permission == 'FULL_CONTROL' and grantee.id == OWNER:
continue
if grantee.uri:
grant_list.append({'grantee': grantee.uri.split('http://acs.amazonaws.com/groups/s3/')[1],
'permission': CAMEL_CASED_PERMISSIONS[permission]})
else:
grant_list.append({
'grantee': {
'id': grantee.id,
'displayName': None if not grantee.display_name else grantee.display_name
},
'permission': CAMEL_CASED_PERMISSIONS[permission]
})
if grant_list:
data['grantList'] = grant_list
return data
def get_canned_acl(acl):
owner_grantee = FakeGrantee(
id='75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a')
owner_grantee = FakeGrantee(id=OWNER)
grants = [FakeGrant([owner_grantee], [PERMISSION_FULL_CONTROL])]
if acl == 'private':
pass # no other permissions
@ -401,6 +446,34 @@ class LifecycleFilter(BaseModel):
self.tag = tag
self.and_filter = and_filter
def to_config_dict(self):
if self.prefix is not None:
return {
'predicate': {
'type': 'LifecyclePrefixPredicate',
'prefix': self.prefix
}
}
elif self.tag:
return {
'predicate': {
'type': 'LifecycleTagPredicate',
'tag': {
'key': self.tag.key,
'value': self.tag.value
}
}
}
else:
return {
'predicate': {
'type': 'LifecycleAndOperator',
'operands': self.and_filter.to_config_dict()
}
}
class LifecycleAndFilter(BaseModel):
@ -408,6 +481,17 @@ class LifecycleAndFilter(BaseModel):
self.prefix = prefix
self.tags = tags
def to_config_dict(self):
data = []
if self.prefix is not None:
data.append({'type': 'LifecyclePrefixPredicate', 'prefix': self.prefix})
for tag in self.tags:
data.append({'type': 'LifecycleTagPredicate', 'tag': {'key': tag.key, 'value': tag.value}})
return data
class LifecycleRule(BaseModel):
@ -430,6 +514,46 @@ class LifecycleRule(BaseModel):
self.nvt_storage_class = nvt_storage_class
self.aimu_days = aimu_days
def to_config_dict(self):
"""Converts the object to the AWS Config data dict.
Note: The following are missing that should be added in the future:
- transitions (returns None for now)
- noncurrentVersionTransitions (returns None for now)
- LifeCycle Filters that are NOT prefix
:param kwargs:
:return:
"""
lifecycle_dict = {
'id': self.id,
'prefix': self.prefix,
'status': self.status,
'expirationInDays': self.expiration_days,
'expiredObjectDeleteMarker': self.expired_object_delete_marker,
'noncurrentVersionExpirationInDays': -1 or self.nve_noncurrent_days,
'expirationDate': self.expiration_date,
'transitions': None, # Replace me with logic to fill in
'noncurrentVersionTransitions': None, # Replace me with logic to fill in
}
if self.aimu_days:
lifecycle_dict['abortIncompleteMultipartUpload'] = {'daysAfterInitiation': self.aimu_days}
else:
lifecycle_dict['abortIncompleteMultipartUpload'] = None
# Format the filter:
if self.prefix is None and self.filter is None:
lifecycle_dict['filter'] = {'predicate': None}
elif self.prefix:
lifecycle_dict['filter'] = None
else:
lifecycle_dict['filter'] = self.filter.to_config_dict()
return lifecycle_dict
class CorsRule(BaseModel):
@ -450,6 +574,23 @@ class Notification(BaseModel):
self.events = events
self.filters = filters if filters else {}
def to_config_dict(self):
data = {}
# Type and ARN will be filled in by NotificationConfiguration's to_config_dict:
data['events'] = [event for event in self.events]
if self.filters:
data['filter'] = {'s3KeyFilter': {'filterRules': [
{'name': fr['Name'], 'value': fr['Value']} for fr in self.filters['S3Key']['FilterRule']
]}}
else:
data['filter'] = None
data['objectPrefixes'] = [] # Not sure why this is a thing since AWS just seems to return this as filters ¯\_(ツ)_/¯
return data
class NotificationConfiguration(BaseModel):
@ -461,6 +602,29 @@ class NotificationConfiguration(BaseModel):
self.cloud_function = [Notification(c["CloudFunction"], c["Event"], filters=c.get("Filter"), id=c.get("Id"))
for c in cloud_function] if cloud_function else []
def to_config_dict(self):
data = {'configurations': {}}
for topic in self.topic:
topic_config = topic.to_config_dict()
topic_config['topicARN'] = topic.arn
topic_config['type'] = 'TopicConfiguration'
data['configurations'][topic.id] = topic_config
for queue in self.queue:
queue_config = queue.to_config_dict()
queue_config['queueARN'] = queue.arn
queue_config['type'] = 'QueueConfiguration'
data['configurations'][queue.id] = queue_config
for cloud_function in self.cloud_function:
cf_config = cloud_function.to_config_dict()
cf_config['queueARN'] = cloud_function.arn
cf_config['type'] = 'LambdaConfiguration'
data['configurations'][cloud_function.id] = cf_config
return data
class FakeBucket(BaseModel):
@ -735,6 +899,67 @@ class FakeBucket(BaseModel):
bucket = s3_backend.create_bucket(resource_name, region_name)
return bucket
def to_config_dict(self):
"""Return the AWS Config JSON format of this S3 bucket.
Note: The following features are not implemented and will need to be if you care about them:
- Bucket Accelerate Configuration
"""
config_dict = {
'version': '1.3',
'configurationItemCaptureTime': str(self.creation_date),
'configurationItemStatus': 'ResourceDiscovered',
'configurationStateId': str(int(time.mktime(self.creation_date.timetuple()))), # PY2 and 3 compatible
'configurationItemMD5Hash': '',
'arn': "arn:aws:s3:::{}".format(self.name),
'resourceType': 'AWS::S3::Bucket',
'resourceId': self.name,
'resourceName': self.name,
'awsRegion': self.region_name,
'availabilityZone': 'Regional',
'resourceCreationTime': str(self.creation_date),
'relatedEvents': [],
'relationships': [],
'tags': {tag.key: tag.value for tag in self.tagging.tag_set.tags},
'configuration': {
'name': self.name,
'owner': {'id': OWNER},
'creationDate': self.creation_date.isoformat()
}
}
# Make the supplementary configuration:
# TODO: Implement Public Access Block Support
s_config = {'AccessControlList': self.acl.to_config_dict()}
# TODO implement Accelerate Configuration:
s_config['BucketAccelerateConfiguration'] = {'status': None}
if self.rules:
s_config['BucketLifecycleConfiguration'] = {
"rules": [rule.to_config_dict() for rule in self.rules]
}
s_config['BucketLoggingConfiguration'] = {
'destinationBucketName': self.logging.get('TargetBucket', None),
'logFilePrefix': self.logging.get('TargetPrefix', None)
}
s_config['BucketPolicy'] = {
'policyText': self.policy if self.policy else None
}
s_config['IsRequesterPaysEnabled'] = 'false' if self.payer == 'BucketOwner' else 'true'
if self.notification_configuration:
s_config['BucketNotificationConfiguration'] = self.notification_configuration.to_config_dict()
else:
s_config['BucketNotificationConfiguration'] = {'configurations': {}}
config_dict['supplementaryConfiguration'] = s_config
return config_dict
class S3Backend(BaseBackend):

View File

@ -1184,3 +1184,35 @@ def test_list_aggregate_discovered_resource():
with assert_raises(ClientError) as ce:
client.list_aggregate_discovered_resources(ConfigurationAggregatorName='testing', ResourceType='AWS::S3::Bucket', Limit=101)
assert '101' in ce.exception.response['Error']['Message']
@mock_config
@mock_s3
def test_get_resource_config_history():
"""NOTE: We are only really testing the Config part. For each individual service, please add tests
for that individual service's "get_config_resource" function.
"""
client = boto3.client('config', region_name='us-west-2')
# With an invalid resource type:
with assert_raises(ClientError) as ce:
client.get_resource_config_history(resourceType='NOT::A::RESOURCE', resourceId='notcreatedyet')
assert ce.exception.response['Error'] == {'Message': 'Resource notcreatedyet of resourceType:NOT::A::RESOURCE is unknown or has '
'not been discovered', 'Code': 'ResourceNotDiscoveredException'}
# With nothing created yet:
with assert_raises(ClientError) as ce:
client.get_resource_config_history(resourceType='AWS::S3::Bucket', resourceId='notcreatedyet')
assert ce.exception.response['Error'] == {'Message': 'Resource notcreatedyet of resourceType:AWS::S3::Bucket is unknown or has '
'not been discovered', 'Code': 'ResourceNotDiscoveredException'}
# Create an S3 bucket:
s3_client = boto3.client('s3', region_name='us-west-2')
for x in range(0, 10):
s3_client.create_bucket(Bucket='bucket{}'.format(x), CreateBucketConfiguration={'LocationConstraint': 'us-west-2'})
# Now try:
result = client.get_resource_config_history(resourceType='AWS::S3::Bucket', resourceId='bucket1')['configurationItems']
assert len(result) == 1
assert result[0]['resourceName'] == result[0]['resourceId'] == 'bucket1'
assert result[0]['arn'] == 'arn:aws:s3:::bucket1'

View File

@ -289,8 +289,8 @@ def test_multipart_etag_quotes_stripped():
part2 = b'1'
etag2 = multipart.upload_part_from_file(BytesIO(part2), 2).etag
# Strip quotes from etags
etag1 = etag1.replace('"','')
etag2 = etag2.replace('"','')
etag1 = etag1.replace('"', '')
etag2 = etag2.replace('"', '')
xml = "<Part><PartNumber>{0}</PartNumber><ETag>{1}</ETag></Part>"
xml = xml.format(1, etag1) + xml.format(2, etag2)
xml = "<CompleteMultipartUpload>{0}</CompleteMultipartUpload>".format(xml)
@ -1592,7 +1592,8 @@ def test_boto3_copy_object_with_versioning():
response = client.create_multipart_upload(Bucket='blah', Key='test4')
upload_id = response['UploadId']
response = client.upload_part_copy(Bucket='blah', Key='test4', CopySource={'Bucket': 'blah', 'Key': 'test3', 'VersionId': obj3_version_new},
response = client.upload_part_copy(Bucket='blah', Key='test4',
CopySource={'Bucket': 'blah', 'Key': 'test3', 'VersionId': obj3_version_new},
UploadId=upload_id, PartNumber=1)
etag = response["CopyPartResult"]["ETag"]
client.complete_multipart_upload(
@ -2284,7 +2285,7 @@ def test_put_bucket_notification():
assert not result.get("QueueConfigurations")
assert result["LambdaFunctionConfigurations"][0]["Id"]
assert result["LambdaFunctionConfigurations"][0]["LambdaFunctionArn"] == \
"arn:aws:lambda:us-east-1:012345678910:function:lambda"
"arn:aws:lambda:us-east-1:012345678910:function:lambda"
assert result["LambdaFunctionConfigurations"][0]["Events"][0] == "s3:ObjectCreated:*"
assert len(result["LambdaFunctionConfigurations"][0]["Events"]) == 1
assert len(result["LambdaFunctionConfigurations"][0]["Filter"]["Key"]["FilterRules"]) == 1
@ -2367,7 +2368,7 @@ def test_put_bucket_notification_errors():
assert err.exception.response["Error"]["Code"] == "InvalidArgument"
assert err.exception.response["Error"]["Message"] == \
"The notification destination service region is not valid for the bucket location constraint"
"The notification destination service region is not valid for the bucket location constraint"
# Invalid event name:
with assert_raises(ClientError) as err:
@ -2949,7 +2950,7 @@ TEST_XML = """\
def test_boto3_bucket_name_too_long():
s3 = boto3.client('s3', region_name='us-east-1')
with assert_raises(ClientError) as exc:
s3.create_bucket(Bucket='x'*64)
s3.create_bucket(Bucket='x' * 64)
exc.exception.response['Error']['Code'].should.equal('InvalidBucketName')
@ -2957,7 +2958,7 @@ def test_boto3_bucket_name_too_long():
def test_boto3_bucket_name_too_short():
s3 = boto3.client('s3', region_name='us-east-1')
with assert_raises(ClientError) as exc:
s3.create_bucket(Bucket='x'*2)
s3.create_bucket(Bucket='x' * 2)
exc.exception.response['Error']['Code'].should.equal('InvalidBucketName')
@ -2979,7 +2980,7 @@ def test_can_enable_bucket_acceleration():
Bucket=bucket_name,
AccelerateConfiguration={'Status': 'Enabled'},
)
resp.keys().should.have.length_of(1) # Response contains nothing (only HTTP headers)
resp.keys().should.have.length_of(1) # Response contains nothing (only HTTP headers)
resp = s3.get_bucket_accelerate_configuration(Bucket=bucket_name)
resp.should.have.key('Status')
resp['Status'].should.equal('Enabled')
@ -2998,7 +2999,7 @@ def test_can_suspend_bucket_acceleration():
Bucket=bucket_name,
AccelerateConfiguration={'Status': 'Suspended'},
)
resp.keys().should.have.length_of(1) # Response contains nothing (only HTTP headers)
resp.keys().should.have.length_of(1) # Response contains nothing (only HTTP headers)
resp = s3.get_bucket_accelerate_configuration(Bucket=bucket_name)
resp.should.have.key('Status')
resp['Status'].should.equal('Suspended')
@ -3013,7 +3014,7 @@ def test_suspending_acceleration_on_not_configured_bucket_does_nothing():
Bucket=bucket_name,
AccelerateConfiguration={'Status': 'Suspended'},
)
resp.keys().should.have.length_of(1) # Response contains nothing (only HTTP headers)
resp.keys().should.have.length_of(1) # Response contains nothing (only HTTP headers)
resp = s3.get_bucket_accelerate_configuration(Bucket=bucket_name)
resp.shouldnt.have.key('Status')
@ -3173,3 +3174,342 @@ def test_list_config_discovered_resources():
s3_config_query.list_config_service_resources(None, None, 1, 'notabucket')
assert 'The nextToken provided is invalid' in inte.exception.message
@mock_s3
def test_s3_lifecycle_config_dict():
from moto.s3.config import s3_config_query
# With 1 bucket in us-west-2:
s3_config_query.backends['global'].create_bucket('bucket1', 'us-west-2')
# And a lifecycle policy
lifecycle = [
{
'ID': 'rule1',
'Status': 'Enabled',
'Filter': {'Prefix': ''},
'Expiration': {'Days': 1}
},
{
'ID': 'rule2',
'Status': 'Enabled',
'Filter': {
'And': {
'Prefix': 'some/path',
'Tag': [
{'Key': 'TheKey', 'Value': 'TheValue'}
]
}
},
'Expiration': {'Days': 1}
},
{
'ID': 'rule3',
'Status': 'Enabled',
'Filter': {},
'Expiration': {'Days': 1}
},
{
'ID': 'rule4',
'Status': 'Enabled',
'Filter': {'Prefix': ''},
'AbortIncompleteMultipartUpload': {'DaysAfterInitiation': 1}
}
]
s3_config_query.backends['global'].set_bucket_lifecycle('bucket1', lifecycle)
# Get the rules for this:
lifecycles = [rule.to_config_dict() for rule in s3_config_query.backends['global'].buckets['bucket1'].rules]
# Verify the first:
assert lifecycles[0] == {
'id': 'rule1',
'prefix': None,
'status': 'Enabled',
'expirationInDays': 1,
'expiredObjectDeleteMarker': None,
'noncurrentVersionExpirationInDays': -1,
'expirationDate': None,
'transitions': None,
'noncurrentVersionTransitions': None,
'abortIncompleteMultipartUpload': None,
'filter': {
'predicate': {
'type': 'LifecyclePrefixPredicate',
'prefix': ''
}
}
}
# Verify the second:
assert lifecycles[1] == {
'id': 'rule2',
'prefix': None,
'status': 'Enabled',
'expirationInDays': 1,
'expiredObjectDeleteMarker': None,
'noncurrentVersionExpirationInDays': -1,
'expirationDate': None,
'transitions': None,
'noncurrentVersionTransitions': None,
'abortIncompleteMultipartUpload': None,
'filter': {
'predicate': {
'type': 'LifecycleAndOperator',
'operands': [
{
'type': 'LifecyclePrefixPredicate',
'prefix': 'some/path'
},
{
'type': 'LifecycleTagPredicate',
'tag': {
'key': 'TheKey',
'value': 'TheValue'
}
},
]
}
}
}
# And the third:
assert lifecycles[2] == {
'id': 'rule3',
'prefix': None,
'status': 'Enabled',
'expirationInDays': 1,
'expiredObjectDeleteMarker': None,
'noncurrentVersionExpirationInDays': -1,
'expirationDate': None,
'transitions': None,
'noncurrentVersionTransitions': None,
'abortIncompleteMultipartUpload': None,
'filter': {'predicate': None}
}
# And the last:
assert lifecycles[3] == {
'id': 'rule4',
'prefix': None,
'status': 'Enabled',
'expirationInDays': None,
'expiredObjectDeleteMarker': None,
'noncurrentVersionExpirationInDays': -1,
'expirationDate': None,
'transitions': None,
'noncurrentVersionTransitions': None,
'abortIncompleteMultipartUpload': {'daysAfterInitiation': 1},
'filter': {
'predicate': {
'type': 'LifecyclePrefixPredicate',
'prefix': ''
}
}
}
@mock_s3
def test_s3_notification_config_dict():
from moto.s3.config import s3_config_query
# With 1 bucket in us-west-2:
s3_config_query.backends['global'].create_bucket('bucket1', 'us-west-2')
# And some notifications:
notifications = {
'TopicConfiguration': [{
'Id': 'Topic',
"Topic": 'arn:aws:sns:us-west-2:012345678910:mytopic',
"Event": [
"s3:ReducedRedundancyLostObject",
"s3:ObjectRestore:Completed"
]
}],
'QueueConfiguration': [{
'Id': 'Queue',
'Queue': 'arn:aws:sqs:us-west-2:012345678910:myqueue',
'Event': [
"s3:ObjectRemoved:Delete"
],
'Filter': {
'S3Key': {
'FilterRule': [
{
'Name': 'prefix',
'Value': 'stuff/here/'
}
]
}
}
}],
'CloudFunctionConfiguration': [{
'Id': 'Lambda',
'CloudFunction': 'arn:aws:lambda:us-west-2:012345678910:function:mylambda',
'Event': [
"s3:ObjectCreated:Post",
"s3:ObjectCreated:Copy",
"s3:ObjectCreated:Put"
],
'Filter': {
'S3Key': {
'FilterRule': [
{
'Name': 'suffix',
'Value': '.png'
}
]
}
}
}]
}
s3_config_query.backends['global'].put_bucket_notification_configuration('bucket1', notifications)
# Get the notifications for this:
notifications = s3_config_query.backends['global'].buckets['bucket1'].notification_configuration.to_config_dict()
# Verify it all:
assert notifications == {
'configurations': {
'Topic': {
'events': ['s3:ReducedRedundancyLostObject', 's3:ObjectRestore:Completed'],
'filter': None,
'objectPrefixes': [],
'topicARN': 'arn:aws:sns:us-west-2:012345678910:mytopic',
'type': 'TopicConfiguration'
},
'Queue': {
'events': ['s3:ObjectRemoved:Delete'],
'filter': {
's3KeyFilter': {
'filterRules': [{
'name': 'prefix',
'value': 'stuff/here/'
}]
}
},
'objectPrefixes': [],
'queueARN': 'arn:aws:sqs:us-west-2:012345678910:myqueue',
'type': 'QueueConfiguration'
},
'Lambda': {
'events': ['s3:ObjectCreated:Post', 's3:ObjectCreated:Copy', 's3:ObjectCreated:Put'],
'filter': {
's3KeyFilter': {
'filterRules': [{
'name': 'suffix',
'value': '.png'
}]
}
},
'objectPrefixes': [],
'queueARN': 'arn:aws:lambda:us-west-2:012345678910:function:mylambda',
'type': 'LambdaConfiguration'
}
}
}
@mock_s3
def test_s3_acl_to_config_dict():
from moto.s3.config import s3_config_query
from moto.s3.models import FakeAcl, FakeGrant, FakeGrantee, OWNER
# With 1 bucket in us-west-2:
s3_config_query.backends['global'].create_bucket('logbucket', 'us-west-2')
# Get the config dict with nothing other than the owner details:
acls = s3_config_query.backends['global'].buckets['logbucket'].acl.to_config_dict()
assert acls == {
'grantSet': None,
'owner': {'displayName': None, 'id': OWNER}
}
# Add some Log Bucket ACLs:
log_acls = FakeAcl([
FakeGrant([FakeGrantee(uri="http://acs.amazonaws.com/groups/s3/LogDelivery")], "WRITE"),
FakeGrant([FakeGrantee(uri="http://acs.amazonaws.com/groups/s3/LogDelivery")], "READ_ACP"),
FakeGrant([FakeGrantee(id=OWNER)], "FULL_CONTROL")
])
s3_config_query.backends['global'].set_bucket_acl('logbucket', log_acls)
acls = s3_config_query.backends['global'].buckets['logbucket'].acl.to_config_dict()
assert acls == {
'grantSet': None,
'grantList': [{'grantee': 'LogDelivery', 'permission': 'Write'}, {'grantee': 'LogDelivery', 'permission': 'ReadAcp'}],
'owner': {'displayName': None, 'id': OWNER}
}
# Give the owner less than full_control permissions:
log_acls = FakeAcl([FakeGrant([FakeGrantee(id=OWNER)], "READ_ACP"), FakeGrant([FakeGrantee(id=OWNER)], "WRITE_ACP")])
s3_config_query.backends['global'].set_bucket_acl('logbucket', log_acls)
acls = s3_config_query.backends['global'].buckets['logbucket'].acl.to_config_dict()
assert acls == {
'grantSet': None,
'grantList': [
{'grantee': {'id': OWNER, 'displayName': None}, 'permission': 'ReadAcp'},
{'grantee': {'id': OWNER, 'displayName': None}, 'permission': 'WriteAcp'}
],
'owner': {'displayName': None, 'id': OWNER}
}
@mock_s3
def test_s3_config_dict():
from moto.s3.config import s3_config_query
from moto.s3.models import FakeAcl, FakeGrant, FakeGrantee, FakeTag, FakeTagging, FakeTagSet, OWNER
# Without any buckets:
assert not s3_config_query.get_config_resource('some_bucket')
tags = FakeTagging(FakeTagSet([FakeTag('someTag', 'someValue'), FakeTag('someOtherTag', 'someOtherValue')]))
# With 1 bucket in us-west-2:
s3_config_query.backends['global'].create_bucket('bucket1', 'us-west-2')
s3_config_query.backends['global'].put_bucket_tagging('bucket1', tags)
# With a log bucket:
s3_config_query.backends['global'].create_bucket('logbucket', 'us-west-2')
log_acls = FakeAcl([
FakeGrant([FakeGrantee(uri="http://acs.amazonaws.com/groups/s3/LogDelivery")], "WRITE"),
FakeGrant([FakeGrantee(uri="http://acs.amazonaws.com/groups/s3/LogDelivery")], "READ_ACP"),
FakeGrant([FakeGrantee(id=OWNER)], "FULL_CONTROL")
])
s3_config_query.backends['global'].set_bucket_acl('logbucket', log_acls)
s3_config_query.backends['global'].put_bucket_logging('bucket1', {'TargetBucket': 'logbucket', 'TargetPrefix': ''})
# Get the us-west-2 bucket and verify that it works properly:
bucket1_result = s3_config_query.get_config_resource('bucket1')
# Just verify a few things:
assert bucket1_result['arn'] == 'arn:aws:s3:::bucket1'
assert bucket1_result['awsRegion'] == 'us-west-2'
assert bucket1_result['resourceName'] == bucket1_result['resourceId'] == 'bucket1'
assert bucket1_result['tags'] == {'someTag': 'someValue', 'someOtherTag': 'someOtherValue'}
assert isinstance(bucket1_result['configuration'], str)
exist_list = ['AccessControlList', 'BucketAccelerateConfiguration', 'BucketLoggingConfiguration', 'BucketPolicy',
'IsRequesterPaysEnabled', 'BucketNotificationConfiguration']
for exist in exist_list:
assert isinstance(bucket1_result['supplementaryConfiguration'][exist], str)
# Verify the logging config:
assert json.loads(bucket1_result['supplementaryConfiguration']['BucketLoggingConfiguration']) == \
{'destinationBucketName': 'logbucket', 'logFilePrefix': ''}
# Verify the policy:
assert json.loads(bucket1_result['supplementaryConfiguration']['BucketPolicy']) == {'policyText': None}
# Filter by correct region:
assert bucket1_result == s3_config_query.get_config_resource('bucket1', resource_region='us-west-2')
# By incorrect region:
assert not s3_config_query.get_config_resource('bucket1', resource_region='eu-west-1')
# With correct resource ID and name:
assert bucket1_result == s3_config_query.get_config_resource('bucket1', resource_name='bucket1')
# With an incorrect resource name:
assert not s3_config_query.get_config_resource('bucket1', resource_name='eu-bucket-1')