Small fix for S3-AWS Config compatibility

- Small bug in tags with AWS Config
- Aggregated results lack "tags" in the result set
- Buckets also add a supplementary configuration of
"BucketTaggingConfiguration"
This commit is contained in:
Mike Grima 2019-10-29 14:35:13 -07:00
parent 6b67002a42
commit 503bc333ca
4 changed files with 27 additions and 6 deletions

View File

@ -744,8 +744,9 @@ class ConfigBackend(BaseBackend):
def list_aggregate_discovered_resources(self, aggregator_name, resource_type, filters, limit, next_token): def list_aggregate_discovered_resources(self, aggregator_name, resource_type, filters, limit, next_token):
"""This will query against the mocked AWS Config listing function that must exist for the resource backend. """This will query against the mocked AWS Config listing function that must exist for the resource backend.
As far a moto goes -- the only real difference between this function and the `list_discovered_resources` function is that
this will require a Config Aggregator be set up a priori and can search based on resource regions. As far a moto goes -- the only real difference between this function and the `list_discovered_resources` function is that
this will require a Config Aggregator be set up a priori and can search based on resource regions.
:param aggregator_name: :param aggregator_name:
:param resource_type: :param resource_type:
@ -796,9 +797,9 @@ class ConfigBackend(BaseBackend):
def get_resource_config_history(self, resource_type, id, backend_region): def get_resource_config_history(self, resource_type, id, backend_region):
"""Returns the configuration of an item in the AWS Config format of the resource for the current regional backend. """Returns the configuration of an item in the AWS Config format of the resource for the current regional backend.
NOTE: This is --NOT-- returning history as it is not supported in moto at this time. (PR's welcome!) NOTE: This is --NOT-- returning history as it is not supported in moto at this time. (PR's welcome!)
As such, the later_time, earlier_time, limit, and next_token are ignored as this will only As such, the later_time, earlier_time, limit, and next_token are ignored as this will only
return 1 item. (If no items, it raises an exception) return 1 item. (If no items, it raises an exception)
""" """
# If the type isn't implemented then we won't find the item: # If the type isn't implemented then we won't find the item:
if resource_type not in RESOURCE_MAP: if resource_type not in RESOURCE_MAP:
@ -897,6 +898,9 @@ class ConfigBackend(BaseBackend):
item['accountId'] = DEFAULT_ACCOUNT_ID item['accountId'] = DEFAULT_ACCOUNT_ID
# The 'tags' field is not included in aggregate results for some reason...
item.pop('tags', None)
found.append(item) found.append(item)
return {'BaseConfigurationItems': found, 'UnprocessedResourceIdentifiers': not_found} return {'BaseConfigurationItems': found, 'UnprocessedResourceIdentifiers': not_found}

View File

@ -934,6 +934,10 @@ class FakeBucket(BaseModel):
# This is a dobule-wrapped JSON for some reason... # This is a dobule-wrapped JSON for some reason...
s_config = {'AccessControlList': json.dumps(json.dumps(self.acl.to_config_dict()))} s_config = {'AccessControlList': json.dumps(json.dumps(self.acl.to_config_dict()))}
# Tagging is special:
if config_dict['tags']:
s_config['BucketTaggingConfiguration'] = json.dumps({'tagSets': [{'tags': config_dict['tags']}]})
# TODO implement Accelerate Configuration: # TODO implement Accelerate Configuration:
s_config['BucketAccelerateConfiguration'] = {'status': None} s_config['BucketAccelerateConfiguration'] = {'status': None}

View File

@ -1,3 +1,4 @@
import json
from datetime import datetime, timedelta from datetime import datetime, timedelta
import boto3 import boto3
@ -1314,10 +1315,12 @@ def test_batch_get_aggregate_resource_config():
s3_client = boto3.client('s3', region_name='us-west-2') s3_client = boto3.client('s3', region_name='us-west-2')
for x in range(0, 10): for x in range(0, 10):
s3_client.create_bucket(Bucket='bucket{}'.format(x), CreateBucketConfiguration={'LocationConstraint': 'us-west-2'}) s3_client.create_bucket(Bucket='bucket{}'.format(x), CreateBucketConfiguration={'LocationConstraint': 'us-west-2'})
s3_client.put_bucket_tagging(Bucket='bucket{}'.format(x), Tagging={'TagSet': [{'Key': 'Some', 'Value': 'Tag'}]})
s3_client_eu = boto3.client('s3', region_name='eu-west-1') s3_client_eu = boto3.client('s3', region_name='eu-west-1')
for x in range(10, 12): for x in range(10, 12):
s3_client_eu.create_bucket(Bucket='eu-bucket{}'.format(x), CreateBucketConfiguration={'LocationConstraint': 'eu-west-1'}) s3_client_eu.create_bucket(Bucket='eu-bucket{}'.format(x), CreateBucketConfiguration={'LocationConstraint': 'eu-west-1'})
s3_client.put_bucket_tagging(Bucket='eu-bucket{}'.format(x), Tagging={'TagSet': [{'Key': 'Some', 'Value': 'Tag'}]})
# Now try with resources that exist and ones that don't: # Now try with resources that exist and ones that don't:
identifiers = [{'SourceAccountId': DEFAULT_ACCOUNT_ID, 'SourceRegion': 'us-west-2', 'ResourceType': 'AWS::S3::Bucket', identifiers = [{'SourceAccountId': DEFAULT_ACCOUNT_ID, 'SourceRegion': 'us-west-2', 'ResourceType': 'AWS::S3::Bucket',
@ -1339,6 +1342,11 @@ def test_batch_get_aggregate_resource_config():
assert not missing_buckets assert not missing_buckets
# Verify that 'tags' is not in the result set:
for b in result['BaseConfigurationItems']:
assert not b.get('tags')
assert json.loads(b['supplementaryConfiguration']['BucketTaggingConfiguration']) == {'tagSets': [{'tags': {'Some': 'Tag'}}]}
# Verify that if the resource name and ID are correct that things are good: # Verify that if the resource name and ID are correct that things are good:
identifiers = [{'SourceAccountId': DEFAULT_ACCOUNT_ID, 'SourceRegion': 'us-west-2', 'ResourceType': 'AWS::S3::Bucket', identifiers = [{'SourceAccountId': DEFAULT_ACCOUNT_ID, 'SourceRegion': 'us-west-2', 'ResourceType': 'AWS::S3::Bucket',
'ResourceId': 'bucket1', 'ResourceName': 'bucket1'}] 'ResourceId': 'bucket1', 'ResourceName': 'bucket1'}]

View File

@ -3718,6 +3718,8 @@ def test_s3_config_dict():
assert bucket1_result['awsRegion'] == 'us-west-2' assert bucket1_result['awsRegion'] == 'us-west-2'
assert bucket1_result['resourceName'] == bucket1_result['resourceId'] == 'bucket1' assert bucket1_result['resourceName'] == bucket1_result['resourceId'] == 'bucket1'
assert bucket1_result['tags'] == {'someTag': 'someValue', 'someOtherTag': 'someOtherValue'} assert bucket1_result['tags'] == {'someTag': 'someValue', 'someOtherTag': 'someOtherValue'}
assert json.loads(bucket1_result['supplementaryConfiguration']['BucketTaggingConfiguration']) == \
{'tagSets': [{'tags': bucket1_result['tags']}]}
assert isinstance(bucket1_result['configuration'], str) assert isinstance(bucket1_result['configuration'], str)
exist_list = ['AccessControlList', 'BucketAccelerateConfiguration', 'BucketLoggingConfiguration', 'BucketPolicy', exist_list = ['AccessControlList', 'BucketAccelerateConfiguration', 'BucketLoggingConfiguration', 'BucketPolicy',
'IsRequesterPaysEnabled', 'BucketNotificationConfiguration'] 'IsRequesterPaysEnabled', 'BucketNotificationConfiguration']
@ -3748,5 +3750,8 @@ def test_s3_config_dict():
assert not s3_config_query.get_config_resource('bucket1', resource_name='eu-bucket-1') assert not s3_config_query.get_config_resource('bucket1', resource_name='eu-bucket-1')
# Verify that no bucket policy returns the proper value: # Verify that no bucket policy returns the proper value:
assert json.loads(s3_config_query.get_config_resource('logbucket')['supplementaryConfiguration']['BucketPolicy']) == \ logging_bucket = s3_config_query.get_config_resource('logbucket')
assert json.loads(logging_bucket['supplementaryConfiguration']['BucketPolicy']) == \
{'policyText': None} {'policyText': None}
assert not logging_bucket['tags']
assert not logging_bucket['supplementaryConfiguration'].get('BucketTaggingConfiguration')