Add support for ConfigService APIs list_tags_for_resource, untag_resource, tag_resource (#4117)

Co-authored-by: Karri Balk <kbalk@users.noreply.github.com>
This commit is contained in:
kbalk 2021-08-03 07:45:26 -04:00 committed by GitHub
parent 013e3462aa
commit f364a050f0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 481 additions and 150 deletions

View File

@ -2370,6 +2370,7 @@
- [X] batch_get_aggregate_resource_config - [X] batch_get_aggregate_resource_config
- [X] batch_get_resource_config - [X] batch_get_resource_config
- [ ] can_paginate
- [X] delete_aggregation_authorization - [X] delete_aggregation_authorization
- [ ] delete_config_rule - [ ] delete_config_rule
- [X] delete_configuration_aggregator - [X] delete_configuration_aggregator
@ -2411,6 +2412,7 @@
- [ ] describe_remediation_exceptions - [ ] describe_remediation_exceptions
- [ ] describe_remediation_execution_status - [ ] describe_remediation_execution_status
- [ ] describe_retention_configurations - [ ] describe_retention_configurations
- [ ] generate_presigned_url
- [ ] get_aggregate_compliance_details_by_config_rule - [ ] get_aggregate_compliance_details_by_config_rule
- [ ] get_aggregate_config_rule_compliance_summary - [ ] get_aggregate_config_rule_compliance_summary
- [ ] get_aggregate_conformance_pack_compliance_summary - [ ] get_aggregate_conformance_pack_compliance_summary
@ -2425,12 +2427,14 @@
- [ ] get_discovered_resource_counts - [ ] get_discovered_resource_counts
- [ ] get_organization_config_rule_detailed_status - [ ] get_organization_config_rule_detailed_status
- [X] get_organization_conformance_pack_detailed_status - [X] get_organization_conformance_pack_detailed_status
- [ ] get_paginator
- [X] get_resource_config_history - [X] get_resource_config_history
- [ ] get_stored_query - [ ] get_stored_query
- [ ] get_waiter
- [X] list_aggregate_discovered_resources - [X] list_aggregate_discovered_resources
- [X] list_discovered_resources - [X] list_discovered_resources
- [ ] list_stored_queries - [ ] list_stored_queries
- [ ] list_tags_for_resource - [X] list_tags_for_resource
- [X] put_aggregation_authorization - [X] put_aggregation_authorization
- [ ] put_config_rule - [ ] put_config_rule
- [X] put_configuration_aggregator - [X] put_configuration_aggregator
@ -2452,8 +2456,8 @@
- [X] start_configuration_recorder - [X] start_configuration_recorder
- [ ] start_remediation_execution - [ ] start_remediation_execution
- [X] stop_configuration_recorder - [X] stop_configuration_recorder
- [ ] tag_resource - [X] tag_resource
- [ ] untag_resource - [X] untag_resource
</details> </details>
## connect ## connect

View File

@ -12,7 +12,7 @@ class NameTooLongException(JsonRESTError):
name=name, location=location name=name, location=location
) )
) )
super(NameTooLongException, self).__init__("ValidationException", message) super().__init__("ValidationException", message)
class InvalidConfigurationRecorderNameException(JsonRESTError): class InvalidConfigurationRecorderNameException(JsonRESTError):
@ -22,9 +22,7 @@ class InvalidConfigurationRecorderNameException(JsonRESTError):
message = "The configuration recorder name '{name}' is not valid, blank string.".format( message = "The configuration recorder name '{name}' is not valid, blank string.".format(
name=name name=name
) )
super(InvalidConfigurationRecorderNameException, self).__init__( super().__init__("InvalidConfigurationRecorderNameException", message)
"InvalidConfigurationRecorderNameException", message
)
class MaxNumberOfConfigurationRecordersExceededException(JsonRESTError): class MaxNumberOfConfigurationRecordersExceededException(JsonRESTError):
@ -35,9 +33,7 @@ class MaxNumberOfConfigurationRecordersExceededException(JsonRESTError):
"Failed to put configuration recorder '{name}' because the maximum number of " "Failed to put configuration recorder '{name}' because the maximum number of "
"configuration recorders: 1 is reached.".format(name=name) "configuration recorders: 1 is reached.".format(name=name)
) )
super(MaxNumberOfConfigurationRecordersExceededException, self).__init__( super().__init__("MaxNumberOfConfigurationRecordersExceededException", message)
"MaxNumberOfConfigurationRecordersExceededException", message
)
class InvalidRecordingGroupException(JsonRESTError): class InvalidRecordingGroupException(JsonRESTError):
@ -45,9 +41,7 @@ class InvalidRecordingGroupException(JsonRESTError):
def __init__(self): def __init__(self):
message = "The recording group provided is not valid" message = "The recording group provided is not valid"
super(InvalidRecordingGroupException, self).__init__( super().__init__("InvalidRecordingGroupException", message)
"InvalidRecordingGroupException", message
)
class InvalidResourceTypeException(JsonRESTError): class InvalidResourceTypeException(JsonRESTError):
@ -64,9 +58,7 @@ class InvalidResourceTypeException(JsonRESTError):
# For PY2: # For PY2:
message = str(message) message = str(message)
super(InvalidResourceTypeException, self).__init__( super().__init__("ValidationException", message)
"ValidationException", message
)
class NoSuchConfigurationAggregatorException(JsonRESTError): class NoSuchConfigurationAggregatorException(JsonRESTError):
@ -80,9 +72,7 @@ class NoSuchConfigurationAggregatorException(JsonRESTError):
"At least one of the configuration aggregators does not exist. Check the configuration aggregator" "At least one of the configuration aggregators does not exist. Check the configuration aggregator"
" names and try again." " names and try again."
) )
super(NoSuchConfigurationAggregatorException, self).__init__( super().__init__("NoSuchConfigurationAggregatorException", message)
"NoSuchConfigurationAggregatorException", message
)
class NoSuchConfigurationRecorderException(JsonRESTError): class NoSuchConfigurationRecorderException(JsonRESTError):
@ -92,9 +82,7 @@ class NoSuchConfigurationRecorderException(JsonRESTError):
message = "Cannot find configuration recorder with the specified name '{name}'.".format( message = "Cannot find configuration recorder with the specified name '{name}'.".format(
name=name name=name
) )
super(NoSuchConfigurationRecorderException, self).__init__( super().__init__("NoSuchConfigurationRecorderException", message)
"NoSuchConfigurationRecorderException", message
)
class InvalidDeliveryChannelNameException(JsonRESTError): class InvalidDeliveryChannelNameException(JsonRESTError):
@ -104,9 +92,7 @@ class InvalidDeliveryChannelNameException(JsonRESTError):
message = "The delivery channel name '{name}' is not valid, blank string.".format( message = "The delivery channel name '{name}' is not valid, blank string.".format(
name=name name=name
) )
super(InvalidDeliveryChannelNameException, self).__init__( super().__init__("InvalidDeliveryChannelNameException", message)
"InvalidDeliveryChannelNameException", message
)
class NoSuchBucketException(JsonRESTError): class NoSuchBucketException(JsonRESTError):
@ -116,7 +102,7 @@ class NoSuchBucketException(JsonRESTError):
def __init__(self): def __init__(self):
message = "Cannot find a S3 bucket with an empty bucket name." message = "Cannot find a S3 bucket with an empty bucket name."
super(NoSuchBucketException, self).__init__("NoSuchBucketException", message) super().__init__("NoSuchBucketException", message)
class InvalidNextTokenException(JsonRESTError): class InvalidNextTokenException(JsonRESTError):
@ -124,9 +110,7 @@ class InvalidNextTokenException(JsonRESTError):
def __init__(self): def __init__(self):
message = "The nextToken provided is invalid" message = "The nextToken provided is invalid"
super(InvalidNextTokenException, self).__init__( super().__init__("InvalidNextTokenException", message)
"InvalidNextTokenException", message
)
class InvalidS3KeyPrefixException(JsonRESTError): class InvalidS3KeyPrefixException(JsonRESTError):
@ -134,9 +118,7 @@ class InvalidS3KeyPrefixException(JsonRESTError):
def __init__(self): def __init__(self):
message = "The s3 key prefix '' is not valid, empty s3 key prefix." message = "The s3 key prefix '' is not valid, empty s3 key prefix."
super(InvalidS3KeyPrefixException, self).__init__( super().__init__("InvalidS3KeyPrefixException", message)
"InvalidS3KeyPrefixException", message
)
class InvalidSNSTopicARNException(JsonRESTError): class InvalidSNSTopicARNException(JsonRESTError):
@ -146,9 +128,7 @@ class InvalidSNSTopicARNException(JsonRESTError):
def __init__(self): def __init__(self):
message = "The sns topic arn '' is not valid." message = "The sns topic arn '' is not valid."
super(InvalidSNSTopicARNException, self).__init__( super().__init__("InvalidSNSTopicARNException", message)
"InvalidSNSTopicARNException", message
)
class InvalidDeliveryFrequency(JsonRESTError): class InvalidDeliveryFrequency(JsonRESTError):
@ -162,9 +142,7 @@ class InvalidDeliveryFrequency(JsonRESTError):
value=value, good_list=good_list value=value, good_list=good_list
) )
) )
super(InvalidDeliveryFrequency, self).__init__( super().__init__("InvalidDeliveryFrequency", message)
"InvalidDeliveryFrequency", message
)
class MaxNumberOfDeliveryChannelsExceededException(JsonRESTError): class MaxNumberOfDeliveryChannelsExceededException(JsonRESTError):
@ -175,9 +153,7 @@ class MaxNumberOfDeliveryChannelsExceededException(JsonRESTError):
"Failed to put delivery channel '{name}' because the maximum number of " "Failed to put delivery channel '{name}' because the maximum number of "
"delivery channels: 1 is reached.".format(name=name) "delivery channels: 1 is reached.".format(name=name)
) )
super(MaxNumberOfDeliveryChannelsExceededException, self).__init__( super().__init__("MaxNumberOfDeliveryChannelsExceededException", message)
"MaxNumberOfDeliveryChannelsExceededException", message
)
class NoSuchDeliveryChannelException(JsonRESTError): class NoSuchDeliveryChannelException(JsonRESTError):
@ -187,9 +163,7 @@ class NoSuchDeliveryChannelException(JsonRESTError):
message = "Cannot find delivery channel with specified name '{name}'.".format( message = "Cannot find delivery channel with specified name '{name}'.".format(
name=name name=name
) )
super(NoSuchDeliveryChannelException, self).__init__( super().__init__("NoSuchDeliveryChannelException", message)
"NoSuchDeliveryChannelException", message
)
class NoAvailableConfigurationRecorderException(JsonRESTError): class NoAvailableConfigurationRecorderException(JsonRESTError):
@ -197,9 +171,7 @@ class NoAvailableConfigurationRecorderException(JsonRESTError):
def __init__(self): def __init__(self):
message = "Configuration recorder is not available to put delivery channel." message = "Configuration recorder is not available to put delivery channel."
super(NoAvailableConfigurationRecorderException, self).__init__( super().__init__("NoAvailableConfigurationRecorderException", message)
"NoAvailableConfigurationRecorderException", message
)
class NoAvailableDeliveryChannelException(JsonRESTError): class NoAvailableDeliveryChannelException(JsonRESTError):
@ -207,9 +179,7 @@ class NoAvailableDeliveryChannelException(JsonRESTError):
def __init__(self): def __init__(self):
message = "Delivery channel is not available to start configuration recorder." message = "Delivery channel is not available to start configuration recorder."
super(NoAvailableDeliveryChannelException, self).__init__( super().__init__("NoAvailableDeliveryChannelException", message)
"NoAvailableDeliveryChannelException", message
)
class LastDeliveryChannelDeleteFailedException(JsonRESTError): class LastDeliveryChannelDeleteFailedException(JsonRESTError):
@ -220,9 +190,7 @@ class LastDeliveryChannelDeleteFailedException(JsonRESTError):
"Failed to delete last specified delivery channel with name '{name}', because there, " "Failed to delete last specified delivery channel with name '{name}', because there, "
"because there is a running configuration recorder.".format(name=name) "because there is a running configuration recorder.".format(name=name)
) )
super(LastDeliveryChannelDeleteFailedException, self).__init__( super().__init__("LastDeliveryChannelDeleteFailedException", message)
"LastDeliveryChannelDeleteFailedException", message
)
class TooManyAccountSources(JsonRESTError): class TooManyAccountSources(JsonRESTError):
@ -237,14 +205,14 @@ class TooManyAccountSources(JsonRESTError):
locations=", ".join(locations) locations=", ".join(locations)
) )
) )
super(TooManyAccountSources, self).__init__("ValidationException", message) super().__init__("ValidationException", message)
class DuplicateTags(JsonRESTError): class DuplicateTags(JsonRESTError):
code = 400 code = 400
def __init__(self): def __init__(self):
super(DuplicateTags, self).__init__( super().__init__(
"InvalidInput", "InvalidInput",
"Duplicate tag keys found. Please note that Tag keys are case insensitive.", "Duplicate tag keys found. Please note that Tag keys are case insensitive.",
) )
@ -254,7 +222,7 @@ class TagKeyTooBig(JsonRESTError):
code = 400 code = 400
def __init__(self, tag, param="tags.X.member.key"): def __init__(self, tag, param="tags.X.member.key"):
super(TagKeyTooBig, self).__init__( super().__init__(
"ValidationException", "ValidationException",
"1 validation error detected: Value '{}' at '{}' failed to satisfy " "1 validation error detected: Value '{}' at '{}' failed to satisfy "
"constraint: Member must have length less than or equal to 128".format( "constraint: Member must have length less than or equal to 128".format(
@ -267,7 +235,7 @@ class TagValueTooBig(JsonRESTError):
code = 400 code = 400
def __init__(self, tag): def __init__(self, tag):
super(TagValueTooBig, self).__init__( super().__init__(
"ValidationException", "ValidationException",
"1 validation error detected: Value '{}' at 'tags.X.member.value' failed to satisfy " "1 validation error detected: Value '{}' at 'tags.X.member.value' failed to satisfy "
"constraint: Member must have length less than or equal to 256".format(tag), "constraint: Member must have length less than or equal to 256".format(tag),
@ -278,9 +246,7 @@ class InvalidParameterValueException(JsonRESTError):
code = 400 code = 400
def __init__(self, message): def __init__(self, message):
super(InvalidParameterValueException, self).__init__( super().__init__("InvalidParameterValueException", message)
"InvalidParameterValueException", message
)
class InvalidTagCharacters(JsonRESTError): class InvalidTagCharacters(JsonRESTError):
@ -292,14 +258,14 @@ class InvalidTagCharacters(JsonRESTError):
) )
message += "constraint: Member must satisfy regular expression pattern: [\\\\p{L}\\\\p{Z}\\\\p{N}_.:/=+\\\\-@]+" message += "constraint: Member must satisfy regular expression pattern: [\\\\p{L}\\\\p{Z}\\\\p{N}_.:/=+\\\\-@]+"
super(InvalidTagCharacters, self).__init__("ValidationException", message) super().__init__("ValidationException", message)
class TooManyTags(JsonRESTError): class TooManyTags(JsonRESTError):
code = 400 code = 400
def __init__(self, tags, param="tags"): def __init__(self, tags, param="tags"):
super(TooManyTags, self).__init__( super().__init__(
"ValidationException", "ValidationException",
"1 validation error detected: Value '{}' at '{}' failed to satisfy " "1 validation error detected: Value '{}' at '{}' failed to satisfy "
"constraint: Member must have length less than or equal to 50.".format( "constraint: Member must have length less than or equal to 50.".format(
@ -312,19 +278,19 @@ class InvalidResourceParameters(JsonRESTError):
code = 400 code = 400
def __init__(self): def __init__(self):
super(InvalidResourceParameters, self).__init__( super().__init__(
"ValidationException", "ValidationException",
"Both Resource ID and Resource Name " "cannot be specified in the request", "Both Resource ID and Resource Name " "cannot be specified in the request",
) )
class InvalidLimit(JsonRESTError): class InvalidLimitException(JsonRESTError):
code = 400 code = 400
def __init__(self, value): def __init__(self, value):
super(InvalidLimit, self).__init__( super().__init__(
"ValidationException", "InvalidLimitException",
"Value '{value}' at 'limit' failed to satisify constraint: Member" "Value '{value}' at 'limit' failed to satisfy constraint: Member"
" must have value less than or equal to 100".format(value=value), " must have value less than or equal to 100".format(value=value),
) )
@ -333,7 +299,7 @@ class TooManyResourceIds(JsonRESTError):
code = 400 code = 400
def __init__(self): def __init__(self):
super(TooManyResourceIds, self).__init__( super().__init__(
"ValidationException", "ValidationException",
"The specified list had more than 20 resource ID's. " "The specified list had more than 20 resource ID's. "
"It must have '20' or less items", "It must have '20' or less items",
@ -343,11 +309,23 @@ class TooManyResourceIds(JsonRESTError):
class ResourceNotDiscoveredException(JsonRESTError): class ResourceNotDiscoveredException(JsonRESTError):
code = 400 code = 400
def __init__(self, type, resource): def __init__(self, resource_type, resource):
super(ResourceNotDiscoveredException, self).__init__( super().__init__(
"ResourceNotDiscoveredException", "ResourceNotDiscoveredException",
"Resource {resource} of resourceType:{type} is unknown or has not been " "Resource {resource} of resourceType:{type} is unknown or has not been "
"discovered".format(resource=resource, type=type), "discovered".format(resource=resource, type=resource_type),
)
class ResourceNotFoundException(JsonRESTError):
code = 400
def __init__(self, resource_arn):
super().__init__(
"ResourceNotFoundException",
"ResourceArn '{resource_arn}' does not exist".format(
resource_arn=resource_arn
),
) )
@ -365,7 +343,7 @@ class TooManyResourceKeys(JsonRESTError):
# For PY2: # For PY2:
message = str(message) message = str(message)
super(TooManyResourceKeys, self).__init__("ValidationException", message) super().__init__("ValidationException", message)
class InvalidResultTokenException(JsonRESTError): class InvalidResultTokenException(JsonRESTError):
@ -373,22 +351,18 @@ class InvalidResultTokenException(JsonRESTError):
def __init__(self): def __init__(self):
message = "The resultToken provided is invalid" message = "The resultToken provided is invalid"
super(InvalidResultTokenException, self).__init__( super().__init__("InvalidResultTokenException", message)
"InvalidResultTokenException", message
)
class ValidationException(JsonRESTError): class ValidationException(JsonRESTError):
code = 400 code = 400
def __init__(self, message): def __init__(self, message):
super(ValidationException, self).__init__("ValidationException", message) super().__init__("ValidationException", message)
class NoSuchOrganizationConformancePackException(JsonRESTError): class NoSuchOrganizationConformancePackException(JsonRESTError):
code = 400 code = 400
def __init__(self, message): def __init__(self, message):
super(NoSuchOrganizationConformancePackException, self).__init__( super().__init__("NoSuchOrganizationConformancePackException", message)
"NoSuchOrganizationConformancePackException", message
)

View File

@ -1,11 +1,11 @@
import json import json
import re import re
import time import time
import pkg_resources
import random import random
import string import string
from datetime import datetime from datetime import datetime
import pkg_resources
from boto3 import Session from boto3 import Session
@ -35,10 +35,11 @@ from moto.config.exceptions import (
NoSuchConfigurationAggregatorException, NoSuchConfigurationAggregatorException,
InvalidTagCharacters, InvalidTagCharacters,
DuplicateTags, DuplicateTags,
InvalidLimit, InvalidLimitException,
InvalidResourceParameters, InvalidResourceParameters,
TooManyResourceIds, TooManyResourceIds,
ResourceNotDiscoveredException, ResourceNotDiscoveredException,
ResourceNotFoundException,
TooManyResourceKeys, TooManyResourceKeys,
InvalidResultTokenException, InvalidResultTokenException,
ValidationException, ValidationException,
@ -69,6 +70,8 @@ RESOURCE_MAP = {
"AWS::IAM::Policy": policy_config_query, "AWS::IAM::Policy": policy_config_query,
} }
MAX_TAGS_IN_ARG = 50
def datetime2int(date): def datetime2int(date):
return int(time.mktime(date.timetuple())) return int(time.mktime(date.timetuple()))
@ -93,7 +96,7 @@ def snake_to_camels(original, cap_start, cap_arn):
def random_string(): def random_string():
"""Returns a random set of 8 lowercase letters for the Config Aggregator ARN""" """Returns a random set of 8 lowercase letters for the Config Aggregator ARN"""
chars = [] chars = []
for x in range(0, 8): for _ in range(0, 8):
chars.append(random.choice(string.ascii_lowercase)) chars.append(random.choice(string.ascii_lowercase))
return "".join(chars) return "".join(chars)
@ -115,7 +118,7 @@ def validate_tag_key(tag_key, exception_param="tags.X.member.key"):
# [\w\s_.:/=+\-@]+ SHOULD be the same as the Java regex on the AWS documentation: [\p{L}\p{Z}\p{N}_.:/=+\-@]+ # [\w\s_.:/=+\-@]+ SHOULD be the same as the Java regex on the AWS documentation: [\p{L}\p{Z}\p{N}_.:/=+\-@]+
match = re.findall(r"[\w\s_.:/=+\-@]+", tag_key) match = re.findall(r"[\w\s_.:/=+\-@]+", tag_key)
# Kudos if you can come up with a better way of doing a global search :) # Kudos if you can come up with a better way of doing a global search :)
if not len(match) or len(match[0]) < len(tag_key): if not match or len(match[0]) < len(tag_key):
raise InvalidTagCharacters(tag_key, param=exception_param) raise InvalidTagCharacters(tag_key, param=exception_param)
@ -133,7 +136,7 @@ def check_tag_duplicate(all_tags, tag_key):
def validate_tags(tags): def validate_tags(tags):
proper_tags = {} proper_tags = {}
if len(tags) > 50: if len(tags) > MAX_TAGS_IN_ARG:
raise TooManyTags(tags) raise TooManyTags(tags)
for tag in tags: for tag in tags:
@ -188,7 +191,7 @@ class ConfigEmptyDictable(BaseModel):
class ConfigRecorderStatus(ConfigEmptyDictable): class ConfigRecorderStatus(ConfigEmptyDictable):
def __init__(self, name): def __init__(self, name):
super(ConfigRecorderStatus, self).__init__() super().__init__()
self.name = name self.name = name
self.recording = False self.recording = False
@ -213,7 +216,7 @@ class ConfigRecorderStatus(ConfigEmptyDictable):
class ConfigDeliverySnapshotProperties(ConfigEmptyDictable): class ConfigDeliverySnapshotProperties(ConfigEmptyDictable):
def __init__(self, delivery_frequency): def __init__(self, delivery_frequency):
super(ConfigDeliverySnapshotProperties, self).__init__() super().__init__()
self.delivery_frequency = delivery_frequency self.delivery_frequency = delivery_frequency
@ -222,7 +225,7 @@ class ConfigDeliveryChannel(ConfigEmptyDictable):
def __init__( def __init__(
self, name, s3_bucket_name, prefix=None, sns_arn=None, snapshot_properties=None self, name, s3_bucket_name, prefix=None, sns_arn=None, snapshot_properties=None
): ):
super(ConfigDeliveryChannel, self).__init__() super().__init__()
self.name = name self.name = name
self.s3_bucket_name = s3_bucket_name self.s3_bucket_name = s3_bucket_name
@ -238,7 +241,7 @@ class RecordingGroup(ConfigEmptyDictable):
include_global_resource_types=False, include_global_resource_types=False,
resource_types=None, resource_types=None,
): ):
super(RecordingGroup, self).__init__() super().__init__()
self.all_supported = all_supported self.all_supported = all_supported
self.include_global_resource_types = include_global_resource_types self.include_global_resource_types = include_global_resource_types
@ -247,7 +250,7 @@ class RecordingGroup(ConfigEmptyDictable):
class ConfigRecorder(ConfigEmptyDictable): class ConfigRecorder(ConfigEmptyDictable):
def __init__(self, role_arn, recording_group, name="default", status=None): def __init__(self, role_arn, recording_group, name="default", status=None):
super(ConfigRecorder, self).__init__() super().__init__()
self.name = name self.name = name
self.role_arn = role_arn self.role_arn = role_arn
@ -261,7 +264,7 @@ class ConfigRecorder(ConfigEmptyDictable):
class AccountAggregatorSource(ConfigEmptyDictable): class AccountAggregatorSource(ConfigEmptyDictable):
def __init__(self, account_ids, aws_regions=None, all_aws_regions=None): def __init__(self, account_ids, aws_regions=None, all_aws_regions=None):
super(AccountAggregatorSource, self).__init__(capitalize_start=True) super().__init__(capitalize_start=True)
# Can't have both the regions and all_regions flag present -- also can't have them both missing: # Can't have both the regions and all_regions flag present -- also can't have them both missing:
if aws_regions and all_aws_regions: if aws_regions and all_aws_regions:
@ -287,9 +290,7 @@ class AccountAggregatorSource(ConfigEmptyDictable):
class OrganizationAggregationSource(ConfigEmptyDictable): class OrganizationAggregationSource(ConfigEmptyDictable):
def __init__(self, role_arn, aws_regions=None, all_aws_regions=None): def __init__(self, role_arn, aws_regions=None, all_aws_regions=None):
super(OrganizationAggregationSource, self).__init__( super().__init__(capitalize_start=True, capitalize_arn=False)
capitalize_start=True, capitalize_arn=False
)
# Can't have both the regions and all_regions flag present -- also can't have them both missing: # Can't have both the regions and all_regions flag present -- also can't have them both missing:
if aws_regions and all_aws_regions: if aws_regions and all_aws_regions:
@ -315,9 +316,7 @@ class OrganizationAggregationSource(ConfigEmptyDictable):
class ConfigAggregator(ConfigEmptyDictable): class ConfigAggregator(ConfigEmptyDictable):
def __init__(self, name, region, account_sources=None, org_source=None, tags=None): def __init__(self, name, region, account_sources=None, org_source=None, tags=None):
super(ConfigAggregator, self).__init__( super().__init__(capitalize_start=True, capitalize_arn=False)
capitalize_start=True, capitalize_arn=False
)
self.configuration_aggregator_name = name self.configuration_aggregator_name = name
self.configuration_aggregator_arn = "arn:aws:config:{region}:{id}:config-aggregator/config-aggregator-{random}".format( self.configuration_aggregator_arn = "arn:aws:config:{region}:{id}:config-aggregator/config-aggregator-{random}".format(
@ -328,12 +327,12 @@ class ConfigAggregator(ConfigEmptyDictable):
self.creation_time = datetime2int(datetime.utcnow()) self.creation_time = datetime2int(datetime.utcnow())
self.last_updated_time = datetime2int(datetime.utcnow()) self.last_updated_time = datetime2int(datetime.utcnow())
# Tags are listed in the list_tags_for_resource API call ... not implementing yet -- please feel free to! # Tags are listed in the list_tags_for_resource API call.
self.tags = tags or {} self.tags = tags or {}
# Override the to_dict so that we can format the tags properly... # Override the to_dict so that we can format the tags properly...
def to_dict(self): def to_dict(self):
result = super(ConfigAggregator, self).to_dict() result = super().to_dict()
# Override the account aggregation sources if present: # Override the account aggregation sources if present:
if self.account_aggregation_sources: if self.account_aggregation_sources:
@ -341,9 +340,10 @@ class ConfigAggregator(ConfigEmptyDictable):
a.to_dict() for a in self.account_aggregation_sources a.to_dict() for a in self.account_aggregation_sources
] ]
# Tags are listed in the list_tags_for_resource API call ... not implementing yet -- please feel free to! if self.tags:
# if self.tags: result["Tags"] = [
# result['Tags'] = [{'Key': key, 'Value': value} for key, value in self.tags.items()] {"Key": key, "Value": value} for key, value in self.tags.items()
]
return result return result
@ -352,9 +352,7 @@ class ConfigAggregationAuthorization(ConfigEmptyDictable):
def __init__( def __init__(
self, current_region, authorized_account_id, authorized_aws_region, tags=None self, current_region, authorized_account_id, authorized_aws_region, tags=None
): ):
super(ConfigAggregationAuthorization, self).__init__( super().__init__(capitalize_start=True, capitalize_arn=False)
capitalize_start=True, capitalize_arn=False
)
self.aggregation_authorization_arn = ( self.aggregation_authorization_arn = (
"arn:aws:config:{region}:{id}:aggregation-authorization/" "arn:aws:config:{region}:{id}:aggregation-authorization/"
@ -369,7 +367,7 @@ class ConfigAggregationAuthorization(ConfigEmptyDictable):
self.authorized_aws_region = authorized_aws_region self.authorized_aws_region = authorized_aws_region
self.creation_time = datetime2int(datetime.utcnow()) self.creation_time = datetime2int(datetime.utcnow())
# Tags are listed in the list_tags_for_resource API call ... not implementing yet -- please feel free to! # Tags are listed in the list_tags_for_resource API call.
self.tags = tags or {} self.tags = tags or {}
@ -383,9 +381,7 @@ class OrganizationConformancePack(ConfigEmptyDictable):
input_parameters=None, input_parameters=None,
excluded_accounts=None, excluded_accounts=None,
): ):
super(OrganizationConformancePack, self).__init__( super().__init__(capitalize_start=True, capitalize_arn=False)
capitalize_start=True, capitalize_arn=False
)
self._status = "CREATE_SUCCESSFUL" self._status = "CREATE_SUCCESSFUL"
self._unique_pack_name = "{0}-{1}".format(name, random_string()) self._unique_pack_name = "{0}-{1}".format(name, random_string())
@ -508,12 +504,12 @@ class ConfigBackend(BaseBackend):
) )
account_sources = [] account_sources = []
for a in config_aggregator["AccountAggregationSources"]: for source in config_aggregator["AccountAggregationSources"]:
account_sources.append( account_sources.append(
AccountAggregatorSource( AccountAggregatorSource(
a["AccountIds"], source["AccountIds"],
aws_regions=a.get("AwsRegions"), aws_regions=source.get("AwsRegions"),
all_aws_regions=a.get("AllAwsRegions"), all_aws_regions=source.get("AllAwsRegions"),
) )
) )
@ -688,32 +684,32 @@ class ConfigBackend(BaseBackend):
if config_recorder.get("recordingGroup") is None: if config_recorder.get("recordingGroup") is None:
recording_group = RecordingGroup() recording_group = RecordingGroup()
else: else:
rg = config_recorder["recordingGroup"] rgroup = config_recorder["recordingGroup"]
# If an empty dict is passed in, then bad: # If an empty dict is passed in, then bad:
if not rg: if not rgroup:
raise InvalidRecordingGroupException() raise InvalidRecordingGroupException()
# Can't have both the resource types specified and the other flags as True. # Can't have both the resource types specified and the other flags as True.
if rg.get("resourceTypes") and ( if rgroup.get("resourceTypes") and (
rg.get("allSupported", False) rgroup.get("allSupported", False)
or rg.get("includeGlobalResourceTypes", False) or rgroup.get("includeGlobalResourceTypes", False)
): ):
raise InvalidRecordingGroupException() raise InvalidRecordingGroupException()
# Must supply resourceTypes if 'allSupported' is not supplied: # Must supply resourceTypes if 'allSupported' is not supplied:
if not rg.get("allSupported") and not rg.get("resourceTypes"): if not rgroup.get("allSupported") and not rgroup.get("resourceTypes"):
raise InvalidRecordingGroupException() raise InvalidRecordingGroupException()
# Validate that the list provided is correct: # Validate that the list provided is correct:
self._validate_resource_types(rg.get("resourceTypes", [])) self._validate_resource_types(rgroup.get("resourceTypes", []))
recording_group = RecordingGroup( recording_group = RecordingGroup(
all_supported=rg.get("allSupported", True), all_supported=rgroup.get("allSupported", True),
include_global_resource_types=rg.get( include_global_resource_types=rgroup.get(
"includeGlobalResourceTypes", False "includeGlobalResourceTypes", False
), ),
resource_types=rg.get("resourceTypes", []), resource_types=rgroup.get("resourceTypes", []),
) )
self.recorders[config_recorder["name"]] = ConfigRecorder( self.recorders[config_recorder["name"]] = ConfigRecorder(
@ -727,12 +723,12 @@ class ConfigBackend(BaseBackend):
recorders = [] recorders = []
if recorder_names: if recorder_names:
for rn in recorder_names: for rname in recorder_names:
if not self.recorders.get(rn): if not self.recorders.get(rname):
raise NoSuchConfigurationRecorderException(rn) raise NoSuchConfigurationRecorderException(rname)
# Format the recorder: # Format the recorder:
recorders.append(self.recorders[rn].to_dict()) recorders.append(self.recorders[rname].to_dict())
else: else:
for recorder in self.recorders.values(): for recorder in self.recorders.values():
@ -744,12 +740,12 @@ class ConfigBackend(BaseBackend):
recorders = [] recorders = []
if recorder_names: if recorder_names:
for rn in recorder_names: for rname in recorder_names:
if not self.recorders.get(rn): if not self.recorders.get(rname):
raise NoSuchConfigurationRecorderException(rn) raise NoSuchConfigurationRecorderException(rname)
# Format the recorder: # Format the recorder:
recorders.append(self.recorders[rn].status.to_dict()) recorders.append(self.recorders[rname].status.to_dict())
else: else:
for recorder in self.recorders.values(): for recorder in self.recorders.values():
@ -790,7 +786,7 @@ class ConfigBackend(BaseBackend):
raise MaxNumberOfDeliveryChannelsExceededException(delivery_channel["name"]) raise MaxNumberOfDeliveryChannelsExceededException(delivery_channel["name"])
if not delivery_channel.get("configSnapshotDeliveryProperties"): if not delivery_channel.get("configSnapshotDeliveryProperties"):
dp = None dprop = None
else: else:
# Validate the config snapshot delivery properties: # Validate the config snapshot delivery properties:
@ -798,7 +794,7 @@ class ConfigBackend(BaseBackend):
delivery_channel["configSnapshotDeliveryProperties"] delivery_channel["configSnapshotDeliveryProperties"]
) )
dp = ConfigDeliverySnapshotProperties( dprop = ConfigDeliverySnapshotProperties(
delivery_channel["configSnapshotDeliveryProperties"][ delivery_channel["configSnapshotDeliveryProperties"][
"deliveryFrequency" "deliveryFrequency"
] ]
@ -809,19 +805,19 @@ class ConfigBackend(BaseBackend):
delivery_channel["s3BucketName"], delivery_channel["s3BucketName"],
prefix=delivery_channel.get("s3KeyPrefix", None), prefix=delivery_channel.get("s3KeyPrefix", None),
sns_arn=delivery_channel.get("snsTopicARN", None), sns_arn=delivery_channel.get("snsTopicARN", None),
snapshot_properties=dp, snapshot_properties=dprop,
) )
def describe_delivery_channels(self, channel_names): def describe_delivery_channels(self, channel_names):
channels = [] channels = []
if channel_names: if channel_names:
for cn in channel_names: for cname in channel_names:
if not self.delivery_channels.get(cn): if not self.delivery_channels.get(cname):
raise NoSuchDeliveryChannelException(cn) raise NoSuchDeliveryChannelException(cname)
# Format the delivery channel: # Format the delivery channel:
channels.append(self.delivery_channels[cn].to_dict()) channels.append(self.delivery_channels[cname].to_dict())
else: else:
for channel in self.delivery_channels.values(): for channel in self.delivery_channels.values():
@ -888,7 +884,7 @@ class ConfigBackend(BaseBackend):
limit = limit or DEFAULT_PAGE_SIZE limit = limit or DEFAULT_PAGE_SIZE
if limit > DEFAULT_PAGE_SIZE: if limit > DEFAULT_PAGE_SIZE:
raise InvalidLimit(limit) raise InvalidLimitException(limit)
if resource_ids and resource_name: if resource_ids and resource_name:
raise InvalidResourceParameters() raise InvalidResourceParameters()
@ -962,7 +958,7 @@ class ConfigBackend(BaseBackend):
limit = limit or DEFAULT_PAGE_SIZE limit = limit or DEFAULT_PAGE_SIZE
if limit > DEFAULT_PAGE_SIZE: if limit > DEFAULT_PAGE_SIZE:
raise InvalidLimit(limit) raise InvalidLimitException(limit)
# If the resource type exists and the backend region is implemented in moto, then # If the resource type exists and the backend region is implemented in moto, then
# call upon the resource type's Config Query class to retrieve the list of resources that match the criteria: # call upon the resource type's Config Query class to retrieve the list of resources that match the criteria:
@ -1003,7 +999,7 @@ class ConfigBackend(BaseBackend):
return result return result
def get_resource_config_history(self, resource_type, id, backend_region): def get_resource_config_history(self, resource_type, resource_id, backend_region):
"""Returns the configuration of an item in the AWS Config format of the resource for the current regional backend. """Returns the configuration of an item in the AWS Config format of the resource for the current regional backend.
NOTE: This is --NOT-- returning history as it is not supported in moto at this time. (PR's welcome!) NOTE: This is --NOT-- returning history as it is not supported in moto at this time. (PR's welcome!)
@ -1012,7 +1008,7 @@ class ConfigBackend(BaseBackend):
""" """
# If the type isn't implemented then we won't find the item: # If the type isn't implemented then we won't find the item:
if resource_type not in RESOURCE_MAP: if resource_type not in RESOURCE_MAP:
raise ResourceNotDiscoveredException(resource_type, id) raise ResourceNotDiscoveredException(resource_type, resource_id)
# Is the resource type global? # Is the resource type global?
backend_query_region = ( backend_query_region = (
@ -1023,14 +1019,14 @@ class ConfigBackend(BaseBackend):
# If the backend region isn't implemented then we won't find the item: # If the backend region isn't implemented then we won't find the item:
if not RESOURCE_MAP[resource_type].backends.get(backend_region): if not RESOURCE_MAP[resource_type].backends.get(backend_region):
raise ResourceNotDiscoveredException(resource_type, id) raise ResourceNotDiscoveredException(resource_type, resource_id)
# Get the item: # Get the item:
item = RESOURCE_MAP[resource_type].get_config_resource( item = RESOURCE_MAP[resource_type].get_config_resource(
id, backend_region=backend_query_region resource_id, backend_region=backend_query_region
) )
if not item: if not item:
raise ResourceNotDiscoveredException(resource_type, id) raise ResourceNotDiscoveredException(resource_type, resource_id)
item["accountId"] = DEFAULT_ACCOUNT_ID item["accountId"] = DEFAULT_ACCOUNT_ID
@ -1288,11 +1284,85 @@ class ConfigBackend(BaseBackend):
self.organization_conformance_packs.pop(name) self.organization_conformance_packs.pop(name)
def _match_arn(self, resource_arn):
"""Return config instance that has a matching ARN."""
# The allowed resources are ConfigRule, ConfigurationAggregator,
# and AggregatorAuthorization. ConfigRule isn't currently
# supported.
allowed_resources = [
{
"configs": self.config_aggregators,
"arn_attribute": "configuration_aggregator_arn",
},
{
"configs": self.aggregation_authorizations,
"arn_attribute": "aggregation_authorization_arn",
},
]
# Find matching config for given resource_arn among all the
# allowed config resources.
matched_config = None
for resource in allowed_resources:
for config in resource["configs"].values():
if resource_arn == getattr(config, resource["arn_attribute"]):
matched_config = config
break
if not matched_config:
raise ResourceNotFoundException(resource_arn)
return matched_config
def tag_resource(self, resource_arn, tags):
"""Add tags in config with a matching ARN."""
# Tag validation:
tags = validate_tags(tags)
# Find config with a matching ARN.
matched_config = self._match_arn(resource_arn)
# Merge the new tags with the existing tags.
matched_config.tags.update(tags)
def untag_resource(self, resource_arn, tag_keys):
"""Remove tags in config with a matching ARN.
If the tags in the tag_keys don't match any keys for that
ARN, they're just ignored.
"""
if len(tag_keys) > MAX_TAGS_IN_ARG:
raise TooManyTags(tag_keys)
# Find config with a matching ARN.
matched_config = self._match_arn(resource_arn)
for tag_key in tag_keys:
matched_config.tags.pop(tag_key, None)
def list_tags_for_resource(self, resource_arn, limit, next_token):
"""Return list of tags for AWS Config resource."""
# The limit argument is essentially ignored as a config instance
# can only have 50 tags, but we'll check the argument anyway.
# Although the boto3 documentation indicates the limit is 50, boto3
# accepts a limit value up to 100 as does the AWS CLI.
limit = limit or DEFAULT_PAGE_SIZE
if limit > DEFAULT_PAGE_SIZE:
raise InvalidLimitException(limit)
matched_config = self._match_arn(resource_arn)
return {
"Tags": [{"Key": k, "Value": v} for k, v in matched_config.tags.items()]
}
config_backends = {} config_backends = {}
for region in Session().get_available_regions("config"): for available_region in Session().get_available_regions("config"):
config_backends[region] = ConfigBackend() config_backends[available_region] = ConfigBackend()
for region in Session().get_available_regions("config", partition_name="aws-us-gov"): for available_region in Session().get_available_regions(
config_backends[region] = ConfigBackend() "config", partition_name="aws-us-gov"
for region in Session().get_available_regions("config", partition_name="aws-cn"): ):
config_backends[region] = ConfigBackend() config_backends[available_region] = ConfigBackend()
for available_region in Session().get_available_regions(
"config", partition_name="aws-cn"
):
config_backends[available_region] = ConfigBackend()

View File

@ -133,6 +133,14 @@ class ConfigResponse(BaseResponse):
) )
return json.dumps(schema) return json.dumps(schema)
def list_tags_for_resource(self):
schema = self.config_backend.list_tags_for_resource(
self._get_param("ResourceArn"),
self._get_param("Limit"),
self._get_param("NextToken"),
)
return json.dumps(schema)
def get_resource_config_history(self): def get_resource_config_history(self):
schema = self.config_backend.get_resource_config_history( schema = self.config_backend.get_resource_config_history(
self._get_param("resourceType"), self._get_param("resourceId"), self.region self._get_param("resourceType"), self._get_param("resourceId"), self.region
@ -202,3 +210,15 @@ class ConfigResponse(BaseResponse):
) )
return "" return ""
def tag_resource(self):
self.config_backend.tag_resource(
self._get_param("ResourceArn"), self._get_param("Tags"),
)
return ""
def untag_resource(self):
self.config_backend.untag_resource(
self._get_param("ResourceArn"), self._get_param("TagKeys"),
)
return ""

View File

@ -0,0 +1,263 @@
"""Unit tests specific to the tag-related ConfigService APIs.
These APIs include:
list_tags_for_resource
tag_resource
untag_resource
"""
import boto3
from botocore.exceptions import ClientError
from botocore.exceptions import ParamValidationError
import pytest
from moto.config import mock_config
from moto.config.models import MAX_TAGS_IN_ARG
from moto.config.models import random_string
from moto.core import ACCOUNT_ID
TEST_REGION = "us-east-1"
def config_aggregators_info(client):
"""Return list of dicts of ConfigAggregators ARNs and tags.
One ConfigAggregator would do, but this tests that a list of
configs can be handled by the caller.
"""
config_aggs = []
for idx in range(3):
tags = [
{"Key": f"{x}", "Value": f"{x}"} for x in range(idx * 10, idx * 10 + 10)
]
response = client.put_configuration_aggregator(
ConfigurationAggregatorName=f"testing_{idx}_{random_string()}",
AccountAggregationSources=[
{"AccountIds": [ACCOUNT_ID], "AllAwsRegions": True}
],
Tags=tags,
)
config_info = response["ConfigurationAggregator"]
config_aggs.append(
{"arn": config_info["ConfigurationAggregatorArn"], "tags": tags,}
)
return config_aggs
@mock_config
def test_tag_resource():
"""Test the ConfigSource API tag_resource()."""
client = boto3.client("config", region_name=TEST_REGION)
# Try an ARN when there are no configs instantiated.
no_config_arn = "no_configs"
with pytest.raises(ClientError) as cerr:
client.tag_resource(
ResourceArn=no_config_arn, Tags=[{"Key": "test_key", "Value": "test_value"}]
)
assert cerr.value.response["Error"]["Code"] == "ResourceNotFoundException"
assert (
f"ResourceArn '{no_config_arn}' does not exist"
in cerr.value.response["Error"]["Message"]
)
# Try an invalid ARN.
bad_arn = "bad_arn"
with pytest.raises(ClientError) as cerr:
client.tag_resource(
ResourceArn=bad_arn, Tags=[{"Key": "test_key", "Value": "test_value"}]
)
assert cerr.value.response["Error"]["Code"] == "ResourceNotFoundException"
assert (
f"ResourceArn '{bad_arn}' does not exist"
in cerr.value.response["Error"]["Message"]
)
# Create some configs and use the ARN from one of them for testing the
# tags argument.
config_aggs = config_aggregators_info(client)
good_arn = config_aggs[1]["arn"]
# Try specifying more than 50 keys.
with pytest.raises(ClientError) as cerr:
client.tag_resource(
ResourceArn=good_arn,
Tags=[{"Key": f"{x}", "Value": f"{x}"} for x in range(MAX_TAGS_IN_ARG + 1)],
)
assert cerr.value.response["Error"]["Code"] == "ValidationException"
assert (
"at 'tags' failed to satisfy constraint: Member must have length "
"less than or equal to 50"
) in cerr.value.response["Error"]["Message"]
# Try specifying an invalid key.
with pytest.raises(ParamValidationError) as cerr:
client.tag_resource(ResourceArn=good_arn, Tags=[{"Test": "abc"}])
assert cerr.typename == "ParamValidationError"
assert 'Unknown parameter in Tags[0]: "Test", must be one of: Key, Value' in str(
cerr
)
# Verify keys added to ConfigurationAggregator.
rsp = client.list_tags_for_resource(ResourceArn=good_arn)
tags = rsp["Tags"]
new_tags = [{"Key": "test_key", "Value": "test_value"}]
client.tag_resource(ResourceArn=good_arn, Tags=new_tags)
tags.extend(new_tags)
updated_rsp = client.list_tags_for_resource(ResourceArn=good_arn)
assert tags == updated_rsp["Tags"]
# Verify keys added to AggregationAuthorization.
response = client.put_aggregation_authorization(
AuthorizedAccountId=ACCOUNT_ID,
AuthorizedAwsRegion=TEST_REGION,
Tags=[{"Key": f"{x}", "Value": f"{x}"} for x in range(10)],
)
agg_auth_arn = response["AggregationAuthorization"]["AggregationAuthorizationArn"]
rsp = client.list_tags_for_resource(ResourceArn=agg_auth_arn)
tags = rsp["Tags"]
client.tag_resource(ResourceArn=agg_auth_arn, Tags=new_tags)
tags.extend(new_tags)
updated_rsp = client.list_tags_for_resource(ResourceArn=agg_auth_arn)
assert tags == updated_rsp["Tags"]
# Verify keys added to ConfigRule, when implemented.
@mock_config
def test_untag_resource():
"""Test the ConfigSource API untag_resource()."""
client = boto3.client("config", region_name=TEST_REGION)
# Try an ARN when there are no configs instantiated.
no_config_arn = "no_configs"
with pytest.raises(ClientError) as cerr:
client.untag_resource(
ResourceArn=no_config_arn, TagKeys=["untest_key", "untest_value"]
)
assert cerr.value.response["Error"]["Code"] == "ResourceNotFoundException"
assert (
f"ResourceArn '{no_config_arn}' does not exist"
in cerr.value.response["Error"]["Message"]
)
# Try an invalid ARN.
bad_arn = "bad_arn"
with pytest.raises(ClientError) as cerr:
client.untag_resource(
ResourceArn=bad_arn, TagKeys=["untest_key", "untest_value"]
)
assert cerr.value.response["Error"]["Code"] == "ResourceNotFoundException"
assert (
f"ResourceArn '{bad_arn}' does not exist"
in cerr.value.response["Error"]["Message"]
)
# Create some configs and use the ARN from one of them for testing the
# tags argument.
config_aggs = config_aggregators_info(client)
good_arn = config_aggs[1]["arn"]
# Try specifying more than 50 keys.
with pytest.raises(ClientError) as cerr:
client.untag_resource(
ResourceArn=good_arn, TagKeys=[f"{x}" for x in range(MAX_TAGS_IN_ARG + 1)],
)
assert cerr.value.response["Error"]["Code"] == "ValidationException"
assert (
"at 'tags' failed to satisfy constraint: Member must have length "
"less than or equal to 50"
) in cerr.value.response["Error"]["Message"]
# Try specifying an invalid key -- it should be ignored.
client.untag_resource(ResourceArn=good_arn, TagKeys=["foo"])
# Try a mix of existing and non-existing tags.
rsp = client.list_tags_for_resource(ResourceArn=good_arn)
tags = rsp["Tags"]
client.untag_resource(ResourceArn=good_arn, TagKeys=["10", "foo", "13"])
updated_rsp = client.list_tags_for_resource(ResourceArn=good_arn)
expected_tags = [x for x in tags if x["Key"] not in ["10", "13"]]
assert expected_tags == updated_rsp["Tags"]
# Verify keys removed from ConfigurationAggregator. Add a new tag to
# the current set of tags, then delete the new tag. The original set
# of tags should remain.
rsp = client.list_tags_for_resource(ResourceArn=good_arn)
tags = rsp["Tags"]
test_tags = [{"Key": "test_key", "Value": "test_value"}]
client.tag_resource(ResourceArn=good_arn, Tags=test_tags)
client.untag_resource(ResourceArn=good_arn, TagKeys=[test_tags[0]["Key"]])
updated_rsp = client.list_tags_for_resource(ResourceArn=good_arn)
assert tags == updated_rsp["Tags"]
# Verify keys removed from AggregationAuthorization. Add a new tag to
# the current set of tags, then delete the new tag. The original set
# of tags should remain.
response = client.put_aggregation_authorization(
AuthorizedAccountId=ACCOUNT_ID,
AuthorizedAwsRegion=TEST_REGION,
Tags=[{"Key": f"{x}", "Value": f"{x}"} for x in range(10)],
)
agg_auth_arn = response["AggregationAuthorization"]["AggregationAuthorizationArn"]
rsp = client.list_tags_for_resource(ResourceArn=agg_auth_arn)
tags = rsp["Tags"]
test_tags = [{"Key": "test_key", "Value": "test_value"}]
client.tag_resource(ResourceArn=agg_auth_arn, Tags=test_tags)
client.untag_resource(ResourceArn=agg_auth_arn, TagKeys=[test_tags[0]["Key"]])
updated_rsp = client.list_tags_for_resource(ResourceArn=agg_auth_arn)
assert tags == updated_rsp["Tags"]
# Delete all the tags.
rsp = client.list_tags_for_resource(ResourceArn=good_arn)
client.untag_resource(ResourceArn=good_arn, TagKeys=[x["Key"] for x in rsp["Tags"]])
updated_rsp = client.list_tags_for_resource(ResourceArn=good_arn)
assert not updated_rsp["Tags"]
# Verify keys removed from ConfigRule, when implemented.
@mock_config
def test_list_tags_for_resource():
"""Test the ConfigSource API list_tags_for_resource()."""
client = boto3.client("config", region_name=TEST_REGION)
# Try an invalid ARN.
bad_arn = "bad_arn"
with pytest.raises(ClientError) as cerr:
client.list_tags_for_resource(ResourceArn=bad_arn)
assert cerr.value.response["Error"]["Code"] == "ResourceNotFoundException"
assert (
f"ResourceArn '{bad_arn}' does not exist"
in cerr.value.response["Error"]["Message"]
)
# Create some configs and use the ARN from one of them for testing the
# tags argument.
config_aggs = config_aggregators_info(client)
good_arn = config_aggs[1]["arn"]
# Try a limit that is out of range (> 100).
with pytest.raises(ClientError) as cerr:
client.list_tags_for_resource(ResourceArn=good_arn, Limit=101)
assert cerr.value.response["Error"]["Code"] == "InvalidLimitException"
assert (
"Value '101' at 'limit' failed to satisfy constraint"
in cerr.value.response["Error"]["Message"]
)
# Verify there are 10 tags, 10 through 19.
expected_tags = [{"Key": f"{x}", "Value": f"{x}"} for x in range(10, 20)]
rsp = client.list_tags_for_resource(ResourceArn=good_arn)
assert expected_tags == rsp["Tags"]