Add AWS ConfigService put_config_rule, delete_config_rule, describe_config_rule (#4171)

Co-authored-by: Karri Balk <kbalk@users.noreply.github.com>
This commit is contained in:
kbalk 2021-08-21 00:45:52 -04:00 committed by GitHub
parent bd5ab53241
commit b9c7ec383c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 3748 additions and 75 deletions

View File

@ -2450,7 +2450,7 @@
- [X] batch_get_aggregate_resource_config
- [X] batch_get_resource_config
- [X] delete_aggregation_authorization
- [ ] delete_config_rule
- [X] delete_config_rule
- [X] delete_configuration_aggregator
- [X] delete_configuration_recorder
- [ ] delete_conformance_pack
@ -2471,7 +2471,7 @@
- [ ] describe_compliance_by_config_rule
- [ ] describe_compliance_by_resource
- [ ] describe_config_rule_evaluation_status
- [ ] describe_config_rules
- [X] describe_config_rules
- [ ] describe_configuration_aggregator_sources_status
- [X] describe_configuration_aggregators
- [X] describe_configuration_recorder_status
@ -2511,7 +2511,7 @@
- [ ] list_stored_queries
- [X] list_tags_for_resource
- [X] put_aggregation_authorization
- [ ] put_config_rule
- [X] put_config_rule
- [X] put_configuration_aggregator
- [X] put_configuration_recorder
- [ ] put_conformance_pack

View File

@ -1,5 +1,6 @@
include README.md LICENSE AUTHORS.md
include requirements.txt requirements-dev.txt tox.ini
include moto/config/resources/aws_managed_rules.json
include moto/ec2/resources/instance_types.json
include moto/ec2/resources/instance_type_offerings/*/*.json
include moto/ec2/resources/amis.json

View File

@ -1,16 +1,14 @@
from __future__ import unicode_literals
from moto.core.exceptions import JsonRESTError
class NameTooLongException(JsonRESTError):
code = 400
def __init__(self, name, location):
def __init__(self, name, location, max_limit=256):
message = (
"1 validation error detected: Value '{name}' at '{location}' failed to satisfy"
" constraint: Member must have length less than or equal to 256".format(
name=name, location=location
)
f"1 validation error detected: Value '{name}' at '{location}' "
f"failed to satisfy constraint: Member must have length less "
f"than or equal to {max_limit}"
)
super().__init__("ValidationException", message)
@ -234,11 +232,13 @@ class TagKeyTooBig(JsonRESTError):
class TagValueTooBig(JsonRESTError):
code = 400
def __init__(self, tag):
def __init__(self, tag, param="tags.X.member.value"):
super().__init__(
"ValidationException",
"1 validation error detected: Value '{}' at 'tags.X.member.value' failed to satisfy "
"constraint: Member must have length less than or equal to 256".format(tag),
"1 validation error detected: Value '{}' at '{}' failed to satisfy "
"constraint: Member must have length less than or equal to 256".format(
tag, param
),
)
@ -340,9 +340,6 @@ class TooManyResourceKeys(JsonRESTError):
bad_list=bad_list
)
)
# For PY2:
message = str(message)
super().__init__("ValidationException", message)
@ -366,3 +363,46 @@ class NoSuchOrganizationConformancePackException(JsonRESTError):
def __init__(self, message):
super().__init__("NoSuchOrganizationConformancePackException", message)
class MaxNumberOfConfigRulesExceededException(JsonRESTError):
code = 400
def __init__(self, name, max_limit):
message = (
f"Failed to put config rule '{name}' because the maximum number "
f"of config rules: {max_limit} is reached."
)
super().__init__("MaxNumberOfConfigRulesExceededException", message)
class ResourceInUseException(JsonRESTError):
code = 400
def __init__(self, message):
super().__init__("ResourceInUseException", message)
class InsufficientPermissionsException(JsonRESTError):
code = 400
def __init__(self, message):
super().__init__("InsufficientPermissionsException", message)
class NoSuchConfigRuleException(JsonRESTError):
code = 400
def __init__(self, rule_name):
message = (
f"The ConfigRule '{rule_name}' provided in the request is "
f"invalid. Please check the configRule name"
)
super().__init__("NoSuchConfigRuleException", message)
class MissingRequiredConfigRuleParameterException(JsonRESTError):
code = 400
def __init__(self, message):
super().__init__("ParamValidationError", message)

View File

@ -1,3 +1,5 @@
"""Implementation of the AWS Config Service APIs."""
import json
import re
import time
import random
@ -42,14 +44,20 @@ from moto.config.exceptions import (
InvalidResultTokenException,
ValidationException,
NoSuchOrganizationConformancePackException,
MaxNumberOfConfigRulesExceededException,
InsufficientPermissionsException,
NoSuchConfigRuleException,
ResourceInUseException,
MissingRequiredConfigRuleParameterException,
)
from moto.core import BaseBackend, BaseModel
from moto.s3.config import s3_account_public_access_block_query, s3_config_query
from moto.core import ACCOUNT_ID as DEFAULT_ACCOUNT_ID
from moto.core.responses import AWSServiceSpec
from moto.iam.config import role_config_query, policy_config_query
from moto.s3.config import s3_account_public_access_block_query, s3_config_query
from moto.utilities.utils import load_resource
POP_STRINGS = [
"capitalizeStart",
@ -60,6 +68,7 @@ POP_STRINGS = [
"CapitalizeARN",
]
DEFAULT_PAGE_SIZE = 100
CONFIG_RULE_PAGE_SIZE = 25
# Map the Config resource type to a backend:
RESOURCE_MAP = {
@ -69,8 +78,13 @@ RESOURCE_MAP = {
"AWS::IAM::Policy": policy_config_query,
}
CAMEL_TO_SNAKE_REGEX = re.compile(r"(?<!^)(?=[A-Z])")
MAX_TAGS_IN_ARG = 50
MANAGED_RULES = load_resource(__name__, "resources/aws_managed_rules.json")
MANAGED_RULES_CONSTRAINTS = MANAGED_RULES["ManagedRules"]
def datetime2int(date):
return int(time.mktime(date.timetuple()))
@ -105,7 +119,8 @@ def validate_tag_key(tag_key, exception_param="tags.X.member.key"):
"""Validates the tag key.
:param tag_key: The tag key to check against.
:param exception_param: The exception parameter to send over to help format the message. This is to reflect
:param exception_param: The exception parameter to send over to help
format the message. This is to reflect
the difference between the tag and untag APIs.
:return:
"""
@ -114,7 +129,8 @@ def validate_tag_key(tag_key, exception_param="tags.X.member.key"):
raise TagKeyTooBig(tag_key, param=exception_param)
# Validate that the tag key fits the proper Regex:
# [\w\s_.:/=+\-@]+ SHOULD be the same as the Java regex on the AWS documentation: [\p{L}\p{Z}\p{N}_.:/=+\-@]+
# [\w\s_.:/=+\-@]+ SHOULD be the same as the Java regex on the AWS
# documentation: [\p{L}\p{Z}\p{N}_.:/=+\-@]+
match = re.findall(r"[\w\s_.:/=+\-@]+", tag_key)
# Kudos if you can come up with a better way of doing a global search :)
if not match or len(match[0]) < len(tag_key):
@ -152,13 +168,34 @@ def validate_tags(tags):
return proper_tags
def convert_to_class_args(dict_arg):
"""Return dict that can be used to instantiate it's representative class.
Given a dictionary in the incoming API request, convert the keys to
snake case to use as arguments when instatiating the representative
class's __init__().
"""
class_args = {}
for key, value in dict_arg.items():
class_args[CAMEL_TO_SNAKE_REGEX.sub("_", key).lower()] = value
# boto detects if extra/unknown arguments are provided, so it's not
# necessary to do so here.
return class_args
class ConfigEmptyDictable(BaseModel):
"""Base class to make serialization easy. This assumes that the sub-class will NOT return 'None's in the JSON."""
"""Base class to make serialization easy.
This assumes that the sub-class will NOT return 'None's in the JSON.
"""
def __init__(self, capitalize_start=False, capitalize_arn=True):
"""Assists with the serialization of the config object
:param capitalize_start: For some Config services, the first letter is lowercase -- for others it's capital
:param capitalize_arn: For some Config services, the API expects 'ARN' and for others, it expects 'Arn'
:param capitalize_start: For some Config services, the first letter
is lowercase -- for others it's capital
:param capitalize_arn: For some Config services, the API expects
'ARN' and for others, it expects 'Arn'
"""
self.capitalize_start = capitalize_start
self.capitalize_arn = capitalize_arn
@ -265,11 +302,13 @@ class AccountAggregatorSource(ConfigEmptyDictable):
def __init__(self, account_ids, aws_regions=None, all_aws_regions=None):
super().__init__(capitalize_start=True)
# Can't have both the regions and all_regions flag present -- also can't have them both missing:
# Can't have both the regions and all_regions flag present -- also
# can't have them both missing:
if aws_regions and all_aws_regions:
raise InvalidParameterValueException(
"Your configuration aggregator contains a list of regions and also specifies "
"the use of all regions. You must choose one of these options."
"Your configuration aggregator contains a list of regions "
"and also specifies the use of all regions. You must choose "
"one of these options."
)
if not (aws_regions or all_aws_regions):
@ -291,7 +330,8 @@ class OrganizationAggregationSource(ConfigEmptyDictable):
def __init__(self, role_arn, aws_regions=None, all_aws_regions=None):
super().__init__(capitalize_start=True, capitalize_arn=False)
# Can't have both the regions and all_regions flag present -- also can't have them both missing:
# Can't have both the regions and all_regions flag present -- also
# can't have them both missing:
if aws_regions and all_aws_regions:
raise InvalidParameterValueException(
"Your configuration aggregator contains a list of regions and also specifies "
@ -411,6 +451,399 @@ class OrganizationConformancePack(ConfigEmptyDictable):
self.last_update_time = datetime2int(datetime.utcnow())
class Scope(ConfigEmptyDictable):
"""Defines resources that can trigger an evaluation for the rule.
Per boto3 documentation, Scope can be one of:
- one or more resource types,
- combo of one resource type and one resource ID,
- combo of tag key and value.
If no scope is specified, evaluations are trigged when any resource
in the recording group changes.
"""
def __init__(
self,
compliance_resource_types=None,
tag_key=None,
tag_value=None,
compliance_resource_id=None,
):
super().__init__(capitalize_start=True, capitalize_arn=False)
self.tags = None
if tag_key or tag_value:
if tag_value and not tag_key:
raise InvalidParameterValueException(
"Tag key should not be empty when tag value is provided in scope"
)
if tag_key and len(tag_key) > 128:
raise TagKeyTooBig(tag_key, "ConfigRule.Scope.TagKey")
if tag_value and len(tag_value) > 256:
raise TagValueTooBig(tag_value, "ConfigRule.Scope.TagValue")
self.tags = {tag_key: tag_value}
# Can't use more than one combo to specify scope - either tags,
# resource types, or resource id and resource type.
if self.tags and (compliance_resource_types or compliance_resource_id):
raise InvalidParameterValueException(
"Scope cannot be applied to both resource and tag"
)
if compliance_resource_id and len(compliance_resource_types) != 1:
raise InvalidParameterValueException(
"A single resourceType should be provided when resourceId "
"is provided in scope"
)
self.compliance_resource_types = compliance_resource_types
self.compliance_resource_id = compliance_resource_id
class SourceDetail(ConfigEmptyDictable):
"""Source and type of event triggering AWS Config resource evaluation.
Applies only to customer rules.
"""
MESSAGE_TYPES = {
"ConfigurationItemChangeNotification",
"ConfigurationSnapshotDeliveryCompleted",
"OversizedConfigurationItemChangeNotification",
"ScheduledNotification",
}
DEFAULT_FREQUENCY = "TwentyFour_Hours"
FREQUENCY_TYPES = {
"One_Hour",
"Six_Hours",
"Three_Hours",
"Twelve_Hours",
"TwentyFour_Hours",
}
EVENT_SOURCES = ["aws.config"]
def __init__(
self, event_source=None, message_type=None, maximum_execution_frequency=None
):
super().__init__(capitalize_start=True, capitalize_arn=False)
# If the event_source or message_type fields are not provided,
# boto3 reports: "SourceDetails should be null/empty if the owner is
# AWS. SourceDetails should be provided if the owner is CUSTOM_LAMBDA."
# A more specific message will be used here instead.
if not event_source:
raise MissingRequiredConfigRuleParameterException(
"Missing required parameter in ConfigRule.SourceDetails: "
"'EventSource'"
)
if event_source not in SourceDetail.EVENT_SOURCES:
raise ValidationException(
f"Value '{event_source}' at "
f"'configRule.source.sourceDetails.eventSource' failed "
f"to satisfy constraint: Member must satisfy enum value set: {{"
+ ", ".join((SourceDetail.EVENT_SOURCES))
+ "}"
)
if not message_type:
# boto3 doesn't have a specific error if this field is missing.
raise MissingRequiredConfigRuleParameterException(
"Missing required parameter in ConfigRule.SourceDetails: 'MessageType'"
)
if message_type not in SourceDetail.MESSAGE_TYPES:
raise ValidationException(
f"Value '{message_type}' at "
f"'configRule.source.sourceDetails.message_type' failed "
f"to satisfy constraint: Member must satisfy enum value set: {{"
+ ", ".join(sorted(SourceDetail.MESSAGE_TYPES))
+ "}"
)
if maximum_execution_frequency:
if maximum_execution_frequency not in SourceDetail.FREQUENCY_TYPES:
raise ValidationException(
f"Value '{maximum_execution_frequency}' at "
f"'configRule.source.sourceDetails.maximumExecutionFrequency' "
f"failed to satisfy constraint: "
f"Member must satisfy enum value set: {{"
+ ", ".join(sorted(SourceDetail.FREQUENCY_TYPES))
+ "}"
)
if message_type in [
"ConfigurationItemChangeNotification",
"OversizedConfigurationItemChangeNotification",
]:
raise InvalidParameterValueException(
"A maximum execution frequency is not allowed if "
"MessageType is ConfigurationItemChangeNotification or "
"OversizedConfigurationItemChangeNotification"
)
else:
# If no value is specified, use a default value for
# maximum_execution_frequency for message types representing a
# periodic trigger.
if message_type in [
"ScheduledNotification",
"ConfigurationSnapshotDeliveryCompleted",
]:
maximum_execution_frequency = SourceDetail.DEFAULT_FREQUENCY
self.event_source = event_source
self.message_type = message_type
self.maximum_execution_frequency = maximum_execution_frequency
class Source(ConfigEmptyDictable):
"""Defines rule owner, id and notification for triggering evaluation."""
OWNERS = {"AWS", "CUSTOM_LAMBDA"}
def __init__(self, region, owner, source_identifier, source_details=None):
super().__init__(capitalize_start=True, capitalize_arn=False)
if owner not in Source.OWNERS:
raise ValidationException(
f"Value '{owner}' at 'configRule.source.owner' failed to "
f"satisfy constraint: Member must satisfy enum value set: {{"
+ ", ".join(sorted(Source.OWNERS))
+ "}"
)
if owner == "AWS":
# Can the Source ID be found in the dict of managed rule IDs?
if source_identifier not in MANAGED_RULES_CONSTRAINTS:
raise InvalidParameterValueException(
f"The sourceIdentifier {source_identifier} is invalid. "
f"Please refer to the documentation for a list of valid "
f"sourceIdentifiers that can be used when AWS is the Owner"
)
if source_details:
raise InvalidParameterValueException(
"SourceDetails should be null/empty if the owner is AWS. "
"SourceDetails should be provided if the owner is "
"CUSTOM_LAMBDA"
)
self.owner = owner
self.source_identifier = source_identifier
self.source_details = None
return
# Otherwise, owner == "CUSTOM_LAMBDA"
if not source_details:
raise InvalidParameterValueException(
"SourceDetails should be null/empty if the owner is AWS. "
"SourceDetails should be provided if the owner is CUSTOM_LAMBDA"
)
# Import is slow and as it's not needed for all config service
# operations, only load it if needed.
from moto.awslambda import lambda_backends
lambda_func = lambda_backends[region].get_function(source_identifier)
if not lambda_func:
raise InsufficientPermissionsException(
f"The AWS Lambda function {source_identifier} cannot be "
f"invoked. Check the specified function ARN, and check the "
f"function's permissions"
)
details = []
for detail in source_details:
detail_dict = convert_to_class_args(detail)
details.append(SourceDetail(**detail_dict))
self.source_details = details
self.owner = owner
self.source_identifier = source_identifier
def to_dict(self):
"""Format the SourceDetails properly."""
result = super().to_dict()
if self.source_details:
result["SourceDetails"] = [x.to_dict() for x in self.source_details]
return result
class ConfigRule(ConfigEmptyDictable):
"""AWS Config Rule to evaluate compliance of resources to configuration.
Can be a managed or custom config rule. Contains the instantiations of
the Source and SourceDetail classes, and optionally the Scope class.
"""
MAX_RULES = 150
RULE_STATES = {"ACTIVE", "DELETING", "DELETING_RESULTS", "EVALUATING"}
def __init__(self, region, config_rule, tags):
super().__init__(capitalize_start=True, capitalize_arn=False)
self.config_rule_name = config_rule.get("ConfigRuleName")
if config_rule.get("ConfigRuleArn") or config_rule.get("ConfigRuleId"):
raise InvalidParameterValueException(
"ConfigRule Arn and Id can not be specified when creating a "
"new ConfigRule. ConfigRule Arn and Id are generated by the "
"service. Please try the request again without specifying "
"ConfigRule Arn or Id"
)
self.maximum_execution_frequency = None # keeps pylint happy
self.modify_fields(region, config_rule, tags)
self.config_rule_id = f"config-rule-{random_string():.6}"
self.config_rule_arn = f"arn:aws:config:{region}:{DEFAULT_ACCOUNT_ID}:config-rule/{self.config_rule_id}"
def modify_fields(self, region, config_rule, tags):
"""Initialize or update ConfigRule fields."""
self.config_rule_state = config_rule.get("ConfigRuleState", "ACTIVE")
if self.config_rule_state not in ConfigRule.RULE_STATES:
raise ValidationException(
f"Value '{self.config_rule_state}' at "
f"'configRule.configRuleState' failed to satisfy constraint: "
f"Member must satisfy enum value set: {{"
+ ", ".join(sorted(ConfigRule.RULE_STATES))
+ "}"
)
if self.config_rule_state != "ACTIVE":
raise InvalidParameterValueException(
f"The ConfigRuleState {self.config_rule_state} is invalid. "
f"Only the following values are permitted: ACTIVE"
)
self.description = config_rule.get("Description")
self.scope = None
if "Scope" in config_rule:
scope_dict = convert_to_class_args(config_rule["Scope"])
self.scope = Scope(**scope_dict)
source_dict = convert_to_class_args(config_rule["Source"])
self.source = Source(region, **source_dict)
self.input_parameters = config_rule.get("InputParameters")
self.input_parameters_dict = {}
if self.input_parameters:
try:
# A dictionary will be more useful when these parameters
# are actually needed.
self.input_parameters_dict = json.loads(self.input_parameters)
except ValueError:
raise InvalidParameterValueException( # pylint: disable=raise-missing-from
f"Invalid json {self.input_parameters} passed in the "
f"InputParameters field"
)
self.maximum_execution_frequency = config_rule.get("MaximumExecutionFrequency")
if self.maximum_execution_frequency:
if self.maximum_execution_frequency not in SourceDetail.FREQUENCY_TYPES:
raise ValidationException(
f"Value '{self.maximum_execution_frequency}' at "
f"'configRule.maximumExecutionFrequency' failed to "
f"satisfy constraint: Member must satisfy enum value set: {{"
+ ", ".join(sorted(SourceDetail.FREQUENCY_TYPES))
+ "}"
)
# For an AWS managed rule, validate the parameters and trigger type.
# Verify the MaximumExecutionFrequency makes sense as well.
if self.source.owner == "AWS":
self.validate_managed_rule()
else:
# Per the AWS documentation for a custom rule, ConfigRule's
# MaximumExecutionFrequency can only be set if the message type
# is ConfigSnapshotDeliveryProperties. However, if
# ConfigSnapshotDeliveryProperties is used, the AWS console
# leaves the Trigger Type blank and doesn't show the frequency.
# If you edit the rule, it doesn't show the frequency either.
#
# If you provide two custom rules, one with a message type of
# ConfigurationSnapshotDeliveryCompleted, one with
# ScheduleNotification and specify a MaximumExecutionFrequency
# for each, the first one is shown on the AWS console and the
# second frequency is shown on the edit page.
#
# If you provide a custom rule for
# OversizedConfigurationItemChangeNotification (not a periodic
# trigger) with a MaximumExecutionFrequency for ConfigRule itself,
# boto3 doesn't complain and describe_config_rule() shows the
# frequency, but the AWS console and the edit page do not.
#
# So I'm not sure how to validate this situation or when to
# set this value to a default value.
pass
self.created_by = config_rule.get("CreatedBy")
if self.created_by:
raise InvalidParameterValueException(
"AWS Config populates the CreatedBy field for "
"ServiceLinkedConfigRule. Try again without populating the "
"CreatedBy field"
)
self.last_updated_time = datetime2int(datetime.utcnow())
self.tags = tags
def validate_managed_rule(self):
"""Validate parameters specific to managed rules."""
rule_info = MANAGED_RULES_CONSTRAINTS[self.source.source_identifier]
param_names = self.input_parameters_dict.keys()
# Verify input parameter names are actual parameters for the rule ID.
if param_names:
allowed_names = {x["Name"] for x in rule_info["Parameters"]}
if allowed_names.difference(set(param_names)):
raise InvalidParameterValueException(
"Unknown parameters provided in the inputParameters: "
+ self.input_parameters.replace('"', '\\"')
)
# Verify all the required parameters are specified.
required_names = {
x["Name"] for x in rule_info["Parameters"] if not x["Optional"]
}
diffs = required_names.difference(set(param_names))
if diffs:
raise InvalidParameterValueException(
"The required parameter ["
+ ", ".join(sorted(diffs))
+ "] is not present in the inputParameters"
)
# boto3 doesn't appear to be checking for valid types in the
# InputParameters. It did give an error if a unquoted number was
# used: "Blank spaces are not acceptable for input parameter:
# MinimumPasswordLength. InputParameters':
# '{"RequireNumbers":"true","MinimumPasswordLength":10}'
# but I'm not going to attempt to detect that error. I could
# check for ints, floats, strings and stringmaps, but boto3 doesn't
# check.
# WARNING: The AWS documentation indicates MaximumExecutionFrequency
# can be specified for managed rules triggered at a periodic frequency.
# However, boto3 allows a MaximumExecutionFrequency to be specified
# for a AWS managed rule regardless of the frequency type. Also of
# interest: triggers of "Configuration Changes and Periodic",
# i.e., both trigger types. But again, the trigger type is ignored.
# if rule_info["Trigger type"] == "Configuration changes":
# if self.maximum_execution_frequency:
# raise InvalidParameterValueException(
# "A maximum execution frequency is not allowed for "
# "rules triggered by configuration changes"
# )
#
# WARNING: boto3's describe_config_rule is not showing the
# MaximumExecutionFrequency value as being updated, but the AWS
# console shows the default value on the console. The default value
# is used even if the rule is non-periodic
# if "Periodic" in rule_info["Trigger type"]:
# if not self.maximum_execution_frequency:
# self.maximum_execution_frequency = SourceDetail.DEFAULT_FREQUENCY
# if not self.maximum_execution_frequency:
# self.maximum_execution_frequency = SourceDetail.DEFAULT_FREQUENCY
# Verify the rule is allowed for this region -- not yet implemented.
class ConfigBackend(BaseBackend):
def __init__(self):
self.recorders = {}
@ -418,6 +851,7 @@ class ConfigBackend(BaseBackend):
self.config_aggregators = {}
self.aggregation_authorizations = {}
self.organization_conformance_packs = {}
self.config_rules = {}
self.config_schema = None
def _validate_resource_types(self, resource_list):
@ -429,11 +863,8 @@ class ConfigBackend(BaseBackend):
# Verify that each entry exists in the supported list:
bad_list = []
for resource in resource_list:
# For PY2:
r_str = str(resource)
if r_str not in self.config_schema.shapes["ResourceType"]["enum"]:
bad_list.append(r_str)
if resource not in self.config_schema.shapes["ResourceType"]["enum"]:
bad_list.append(resource)
if bad_list:
raise InvalidResourceTypeException(
@ -470,14 +901,16 @@ class ConfigBackend(BaseBackend):
# Tag validation:
tags = validate_tags(config_aggregator.get("Tags", []))
# Exception if both AccountAggregationSources and OrganizationAggregationSource are supplied:
# Exception if both AccountAggregationSources and
# OrganizationAggregationSource are supplied:
if config_aggregator.get("AccountAggregationSources") and config_aggregator.get(
"OrganizationAggregationSource"
):
raise InvalidParameterValueException(
"The configuration aggregator cannot be created because your request contains both the"
" AccountAggregationSource and the OrganizationAggregationSource. Include only "
"one aggregation source and try again."
"The configuration aggregator cannot be created because your "
"request contains both the AccountAggregationSource and the "
"OrganizationAggregationSource. Include only one aggregation "
"source and try again."
)
# If neither are supplied:
@ -485,8 +918,9 @@ class ConfigBackend(BaseBackend):
"AccountAggregationSources"
) and not config_aggregator.get("OrganizationAggregationSource"):
raise InvalidParameterValueException(
"The configuration aggregator cannot be created because your request is missing either "
"the AccountAggregationSource or the OrganizationAggregationSource. Include the "
"The configuration aggregator cannot be created because your "
"request is missing either the AccountAggregationSource or "
"the OrganizationAggregationSource. Include the "
"appropriate aggregation source and try again."
)
@ -760,16 +1194,19 @@ class ConfigBackend(BaseBackend):
delivery_channel.get("name"), "deliveryChannel.name"
)
# We are going to assume that the bucket exists -- but will verify if the bucket provided is blank:
# We are going to assume that the bucket exists -- but will verify if
# the bucket provided is blank:
if not delivery_channel.get("s3BucketName"):
raise NoSuchBucketException()
# We are going to assume that the bucket has the correct policy attached to it. We are only going to verify
# We are going to assume that the bucket has the correct policy
# attached to it. We are only going to verify
# if the prefix provided is not an empty string:
if delivery_channel.get("s3KeyPrefix", None) == "":
raise InvalidS3KeyPrefixException()
# Ditto for SNS -- Only going to assume that the ARN provided is not an empty string:
# Ditto for SNS -- Only going to assume that the ARN provided is not
# an empty string:
if delivery_channel.get("snsTopicARN", None) == "":
raise InvalidSNSTopicARNException()
@ -863,7 +1300,9 @@ class ConfigBackend(BaseBackend):
limit,
next_token,
):
"""This will query against the mocked AWS Config (non-aggregated) listing function that must exist for the resource backend.
"""Queries against AWS Config (non-aggregated) listing function.
The listing function must exist for the resource backend.
:param resource_type:
:param backend_region:
@ -887,8 +1326,9 @@ class ConfigBackend(BaseBackend):
if resource_ids and len(resource_ids) > 20:
raise TooManyResourceIds()
# If the resource type exists and the backend region is implemented in moto, then
# call upon the resource type's Config Query class to retrieve the list of resources that match the criteria:
# If resource type exists and the backend region is implemented in
# moto, then call upon the resource type's Config Query class to
# retrieve the list of resources that match the criteria:
if RESOURCE_MAP.get(resource_type, {}):
# Is this a global resource type? -- if so, re-write the region to 'global':
backend_query_region = (
@ -897,7 +1337,8 @@ class ConfigBackend(BaseBackend):
if RESOURCE_MAP[resource_type].backends.get("global"):
backend_region = "global"
# For non-aggregated queries, the we only care about the backend_region. Need to verify that moto has implemented
# For non-aggregated queries, the we only care about the
# backend_region. Need to verify that moto has implemented
# the region for the given backend:
if RESOURCE_MAP[resource_type].backends.get(backend_region):
# Fetch the resources for the backend's region:
@ -931,10 +1372,12 @@ class ConfigBackend(BaseBackend):
def list_aggregate_discovered_resources(
self, aggregator_name, resource_type, filters, limit, next_token
):
"""This will query against the mocked AWS Config listing function that must exist for the resource backend.
"""Queries AWS Config listing function that must exist for resource backend.
As far a moto goes -- the only real difference between this function and the `list_discovered_resources` function is that
this will require a Config Aggregator be set up a priori and can search based on resource regions.
As far a moto goes -- the only real difference between this function
and the `list_discovered_resources` function is that this will require
a Config Aggregator be set up a priori and can search based on resource
regions.
:param aggregator_name:
:param resource_type:
@ -954,8 +1397,9 @@ class ConfigBackend(BaseBackend):
if limit > DEFAULT_PAGE_SIZE:
raise InvalidLimitException(limit)
# If the resource type exists and the backend region is implemented in moto, then
# call upon the resource type's Config Query class to retrieve the list of resources that match the criteria:
# If the resource type exists and the backend region is implemented
# in moto, then call upon the resource type's Config Query class to
# retrieve the list of resources that match the criteria:
if RESOURCE_MAP.get(resource_type, {}):
# We only care about a filter's Region, Resource Name, and Resource ID:
resource_region = filters.get("Region")
@ -994,11 +1438,16 @@ class ConfigBackend(BaseBackend):
return result
def get_resource_config_history(self, resource_type, resource_id, backend_region):
"""Returns the configuration of an item in the AWS Config format of the resource for the current regional backend.
"""Returns configuration of resource for the current regional backend.
NOTE: This is --NOT-- returning history as it is not supported in moto at this time. (PR's welcome!)
As such, the later_time, earlier_time, limit, and next_token are ignored as this will only
return 1 item. (If no items, it raises an exception)
Item returned in AWS Config format.
NOTE: This is --NOT-- returning history as it is not supported in
moto at this time. (PR's welcome!)
As such, the later_time, earlier_time, limit, and next_token are
ignored as this will only return 1 item. (If no items, it raises an
exception).
"""
# If the type isn't implemented then we won't find the item:
if resource_type not in RESOURCE_MAP:
@ -1027,7 +1476,9 @@ class ConfigBackend(BaseBackend):
return {"configurationItems": [item]}
def batch_get_resource_config(self, resource_keys, backend_region):
"""Returns the configuration of an item in the AWS Config format of the resource for the current regional backend.
"""Returns configuration of resource for the current regional backend.
Item is returned in AWS Config format.
:param resource_keys:
:param backend_region:
@ -1078,10 +1529,14 @@ class ConfigBackend(BaseBackend):
def batch_get_aggregate_resource_config(
self, aggregator_name, resource_identifiers
):
"""Returns the configuration of an item in the AWS Config format of the resource for the current regional backend.
"""Returns configuration of resource for current regional backend.
As far a moto goes -- the only real difference between this function and the `batch_get_resource_config` function is that
this will require a Config Aggregator be set up a priori and can search based on resource regions.
Item is returned in AWS Config format.
As far a moto goes -- the only real difference between this function
and the `batch_get_resource_config` function is that this will require
a Config Aggregator be set up a priori and can search based on resource
regions.
Note: moto will IGNORE the resource account ID in the search query.
"""
@ -1140,7 +1595,8 @@ class ConfigBackend(BaseBackend):
if not result_token:
raise InvalidResultTokenException()
# Moto only supports PutEvaluations with test mode currently (missing rule and token support)
# Moto only supports PutEvaluations with test mode currently
# (missing rule and token support).
if not test_mode:
raise NotImplementedError(
"PutEvaluations without TestMode is not yet implemented"
@ -1206,8 +1662,9 @@ class ConfigBackend(BaseBackend):
if not pack:
raise NoSuchOrganizationConformancePackException(
"One or more organization conformance packs with specified names are not present. "
"Ensure your names are correct and try your request again later."
"One or more organization conformance packs with "
"specified names are not present. Ensure your names are "
"correct and try your request again later."
)
packs.append(pack.to_dict())
@ -1224,8 +1681,9 @@ class ConfigBackend(BaseBackend):
if not pack:
raise NoSuchOrganizationConformancePackException(
"One or more organization conformance packs with specified names are not present. "
"Ensure your names are correct and try your request again later."
"One or more organization conformance packs with "
"specified names are not present. Ensure your names "
"are correct and try your request again later."
)
packs.append(pack)
@ -1271,9 +1729,8 @@ class ConfigBackend(BaseBackend):
if not pack:
raise NoSuchOrganizationConformancePackException(
"Could not find an OrganizationConformancePack for given request with resourceName {}".format(
name
)
"Could not find an OrganizationConformancePack for given "
"request with resourceName {}".format(name)
)
self.organization_conformance_packs.pop(name)
@ -1281,8 +1738,7 @@ class ConfigBackend(BaseBackend):
def _match_arn(self, resource_arn):
"""Return config instance that has a matching ARN."""
# The allowed resources are ConfigRule, ConfigurationAggregator,
# and AggregatorAuthorization. ConfigRule isn't currently
# supported.
# and AggregatorAuthorization.
allowed_resources = [
{
"configs": self.config_aggregators,
@ -1292,6 +1748,7 @@ class ConfigBackend(BaseBackend):
"configs": self.aggregation_authorizations,
"arn_attribute": "aggregation_authorization_arn",
},
{"configs": self.config_rules, "arn_attribute": "config_rule_arn"},
]
# Find matching config for given resource_arn among all the
@ -1333,7 +1790,9 @@ class ConfigBackend(BaseBackend):
for tag_key in tag_keys:
matched_config.tags.pop(tag_key, None)
def list_tags_for_resource(self, resource_arn, limit, next_token):
def list_tags_for_resource(
self, resource_arn, limit, next_token
): # pylint: disable=unused-argument
"""Return list of tags for AWS Config resource."""
# The limit argument is essentially ignored as a config instance
# can only have 50 tags, but we'll check the argument anyway.
@ -1345,9 +1804,118 @@ class ConfigBackend(BaseBackend):
matched_config = self._match_arn(resource_arn)
return {
"Tags": [{"Key": k, "Value": v} for k, v in matched_config.tags.items()]
"Tags": [
{"Key": k, "Value": v} for k, v in sorted(matched_config.tags.items())
]
}
def put_config_rule(self, region, config_rule, tags=None):
"""Add/Update config rule for evaluating resource compliance.
TBD - Only the "accounting" of config rules are handled at the
moment. No events are created or triggered. There is no
interaction with the config recorder.
"""
# If there is no rule_name, use the ARN or ID to get the
# rule_name.
rule_name = config_rule.get("ConfigRuleName")
if rule_name:
if len(rule_name) > 128:
raise NameTooLongException(rule_name, "configRule.configRuleName", 128)
else:
# Can we find the rule using the ARN or ID?
rule_arn = config_rule.get("ConfigRuleArn")
rule_id = config_rule.get("ConfigRuleId")
if not rule_arn and not rule_id:
raise InvalidParameterValueException(
"One or more identifiers needs to be provided. Provide "
"Name or Id or Arn"
)
for config_rule_obj in self.config_rules.values():
if rule_id and config_rule_obj.config_rule_id == rule_id:
rule_name = config_rule_obj.config_rule_name
break
if rule_arn and config_rule_obj.config_rule_arn == rule_arn:
rule_name = config_rule_obj.config_rule_name
break
else:
raise InvalidParameterValueException(
"One or more identifiers needs to be provided. Provide "
"Name or Id or Arn"
)
tags = validate_tags(tags or [])
# With the rule_name, determine whether it's for an existing rule
# or whether a new rule should be created.
rule = self.config_rules.get(rule_name)
if rule:
# Rule exists. Make sure it isn't in use for another activity.
rule_state = rule.config_rule_state
if rule_state != "ACTIVE":
activity = "deleted" if rule_state.startswith("DELET") else "evaluated"
raise ResourceInUseException(
f"The rule {rule_name} is currently being {activity}. "
f"Please retry after some time"
)
# Update the current rule.
rule.modify_fields(region, config_rule, tags)
else:
# Create a new ConfigRule if the limit hasn't been reached.
if len(self.config_rules) == ConfigRule.MAX_RULES:
raise MaxNumberOfConfigRulesExceededException(
rule_name, ConfigRule.MAX_RULES
)
rule = ConfigRule(region, config_rule, tags)
self.config_rules[rule_name] = rule
return ""
def describe_config_rules(self, config_rule_names, next_token):
"""Return details for the given ConfigRule names or for all rules."""
result = {"ConfigRules": []}
if not self.config_rules:
return result
rule_list = []
if config_rule_names:
for name in config_rule_names:
if not self.config_rules.get(name):
raise NoSuchConfigRuleException(name)
rule_list.append(name)
else:
rule_list = list(self.config_rules.keys())
# The rules are not sorted alphanumerically.
sorted_rules = sorted(rule_list)
start = 0
if next_token:
if not self.config_rules.get(next_token):
raise InvalidNextTokenException()
start = sorted_rules.index(next_token)
rule_list = sorted_rules[start : start + CONFIG_RULE_PAGE_SIZE]
result["ConfigRules"] = [self.config_rules[x].to_dict() for x in rule_list]
if len(sorted_rules) > (start + CONFIG_RULE_PAGE_SIZE):
result["NextToken"] = sorted_rules[start + CONFIG_RULE_PAGE_SIZE]
return result
def delete_config_rule(self, rule_name):
"""Delete config rule used for evaluating resource compliance."""
rule = self.config_rules.get(rule_name)
if not rule:
raise NoSuchConfigRuleException(rule_name)
# The following logic is not applicable for moto as far as I can tell.
# if rule.config_rule_state == "DELETING":
# raise ResourceInUseException(
# f"The rule {rule_name} is currently being deleted. Please "
# f"retry after some time"
# )
rule.config_rule_state = "DELETING"
self.config_rules.pop(rule_name)
config_backends = {}
for available_region in Session().get_available_regions("config"):

File diff suppressed because it is too large Load Diff

View File

@ -186,14 +186,12 @@ class ConfigResponse(BaseResponse):
conformance_packs = self.config_backend.describe_organization_conformance_packs(
self._get_param("OrganizationConformancePackNames")
)
return json.dumps(conformance_packs)
def describe_organization_conformance_pack_statuses(self):
statuses = self.config_backend.describe_organization_conformance_pack_statuses(
self._get_param("OrganizationConformancePackNames")
)
return json.dumps(statuses)
def get_organization_conformance_pack_detailed_status(self):
@ -201,14 +199,12 @@ class ConfigResponse(BaseResponse):
statuses = self.config_backend.get_organization_conformance_pack_detailed_status(
self._get_param("OrganizationConformancePackName")
)
return json.dumps(statuses)
def delete_organization_conformance_pack(self):
self.config_backend.delete_organization_conformance_pack(
self._get_param("OrganizationConformancePackName")
)
return ""
def tag_resource(self):
@ -222,3 +218,19 @@ class ConfigResponse(BaseResponse):
self._get_param("ResourceArn"), self._get_param("TagKeys"),
)
return ""
def put_config_rule(self):
self.config_backend.put_config_rule(
self.region, self._get_param("ConfigRule"), self._get_param("Tags"),
)
return ""
def describe_config_rules(self):
rules = self.config_backend.describe_config_rules(
self._get_param("ConfigRuleNames"), self._get_param("NextToken"),
)
return json.dumps(rules)
def delete_config_rule(self):
self.config_backend.delete_config_rule(self._get_param("ConfigRuleName"))
return ""

View File

@ -0,0 +1,141 @@
#!/usr/bin/env python
"""Download markdown files with AWS managed ConfigRule info and convert to JSON.
The first markdown file is read to obtain the names of markdown files for
all the AWS managed config rules. Then each of those markdown files are read
and info is extracted with the final results written to a JSON file.
The JSON output will look as follows:
{
"ManagedRules": [
{
"ACCESS_KEYS_ROTATED": {
"AWS Region": "All supported AWS regions",
"Parameters": [
{
"Default": "90",
"Name": "maxAccessKeyAgeType",
"Optional": false,
"Type": "intDefault"
}
],
"Trigger type": "Periodic"
},
},
...
]
}
"""
import json
import re
import sys
import requests
MANAGED_RULES_OUTPUT_FILENAME = "../moto/config/resources/aws_managed_rules.json"
AWS_MARKDOWN_URL_START = "https://raw.githubusercontent.com/awsdocs/aws-config-developer-guide/main/doc_source/"
LIST_OF_MARKDOWNS_URL = "managed-rules-by-aws-config.md"
def managed_rule_info(lines):
"""Return dict of qualifiers/rules extracted from a markdown file."""
rule_info = {}
label_pattern = re.compile(r"(?:\*\*)(?P<label>[^\*].*)\:\*\*\s?(?P<value>.*)?")
# Examples of parameter definitions:
# maxAccessKeyAgeType: intDefault: 90
# IgnorePublicAcls \(Optional\)Type: StringDefault: True
# MasterAccountId \(Optional\)Type: String
# endpointConfigurationTypesType: String
#
collecting_params = False
params = []
for line in lines:
if not line:
continue
line = line.replace("\\", "").strip()
# Parameters are listed in the lines following the label, so they
# require special processing.
if collecting_params:
# A new header marks the end of the parameters.
if line.startswith("##"):
rule_info["Parameters"] = params
break
if "Type: " in line:
values = re.split(r":\s?", line)
name = values[0]
param_type = values[1]
# If there is no Optional keyword, then sometimes there
# isn't a space between the parameter name and "Type".
name = re.sub("Type$", "", name)
# Sometimes there isn't a space between the type and the
# word "Default".
if "Default" in param_type:
param_type = re.sub("Default$", "", param_type)
optional = False
if "Optional" in line:
optional = True
# Remove "Optional" from the line.
name = name.split()[0]
values_dict = {
"Name": name,
"Optional": optional,
"Type": param_type,
}
# A default value isn't always provided.
if len(values) > 2:
values_dict["Default"] = values[2]
params.append(values_dict)
continue
# Check for a label starting with two asterisks.
matches = re.match(label_pattern, line)
if not matches:
continue
# Look for "Identifier", "Trigger type", "AWS Region" and "Parameters"
# labels and store the values for all but parameters. Parameters
# values aren't on the same line as labels.
label = matches.group("label")
value = matches.group("value")
if label in ["Identifier", "Trigger type", "AWS Region"]:
rule_info[label] = value.replace("\\", "")
elif label == "Parameters":
collecting_params = True
else:
print(f"ERROR: Unknown label: '{label}', line: '{line}'", file=sys.stderr)
return rule_info
def main():
"""Create a JSON file containing info pulled from AWS markdown files."""
req = requests.get(AWS_MARKDOWN_URL_START + LIST_OF_MARKDOWNS_URL)
# Extract the list of all the markdown files on the page.
link_pattern = re.compile(r"\+ \[[^\]]+\]\(([^)]+)\)")
markdown_files = link_pattern.findall(req.text)
markdown = {"ManagedRules": {}}
for markdown_file in markdown_files:
req = requests.get(AWS_MARKDOWN_URL_START + markdown_file)
rules = managed_rule_info(req.text.split("\n"))
rule_id = rules.pop("Identifier")
markdown["ManagedRules"][rule_id] = rules
with open(MANAGED_RULES_OUTPUT_FILENAME, "w") as fhandle:
json.dump(markdown, fhandle, sort_keys=True, indent=2)
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@ -0,0 +1,728 @@
"""Unit tests specific to the ConfigService ConfigRule APIs.
These APIs include:
put_config_rule
describe_config_rule
delete_config_rule
"""
from io import BytesIO
import json
from string import ascii_lowercase
from zipfile import ZipFile, ZIP_DEFLATED
import boto3
from botocore.exceptions import ClientError
import pytest
from moto.config import mock_config
from moto.config.models import random_string
from moto.config.models import ConfigRule, CONFIG_RULE_PAGE_SIZE
from moto.core import ACCOUNT_ID
from moto import settings, mock_iam, mock_lambda
TEST_REGION = "us-east-1" if settings.TEST_SERVER_MODE else "us-west-2"
def managed_config_rule():
"""Return a valid managed AWS Config Rule."""
return {
"ConfigRuleName": f"managed_rule_{random_string()}",
"Description": "Managed S3 Public Read Prohibited Bucket Rule",
"Scope": {"ComplianceResourceTypes": ["AWS::S3::Bucket", "AWS::IAM::Group"]},
"Source": {
"Owner": "AWS",
"SourceIdentifier": "S3_BUCKET_PUBLIC_READ_PROHIBITED",
},
"MaximumExecutionFrequency": "One_Hour",
}
def custom_config_rule():
"""Return a valid custom AWS Config Rule."""
return {
"ConfigRuleName": f"custom_rule_{random_string()}",
"Description": "Custom S3 Public Read Prohibited Bucket Rule",
"Scope": {"ComplianceResourceTypes": ["AWS::S3::Bucket", "AWS::IAM::Group"]},
"Source": {
"Owner": "CUSTOM_LAMBDA",
"SourceIdentifier": f"arn:aws:lambda:{TEST_REGION}:{ACCOUNT_ID}:function:test_config_rule",
"SourceDetails": [
{
"EventSource": "aws.config",
"MessageType": "ScheduledNotification",
"MaximumExecutionFrequency": "Three_Hours",
},
],
},
"MaximumExecutionFrequency": "One_Hour",
}
def zipped_lambda_function():
"""Return a simple test lambda function, zipped."""
func_str = """
def lambda_handler(event, context):
print("testing")
return event
"""
zip_output = BytesIO()
with ZipFile(zip_output, "w", ZIP_DEFLATED) as zip_file:
zip_file.writestr("lambda_function.py", func_str)
zip_file.close()
zip_output.seek(0)
return zip_output.read()
@mock_lambda
def create_lambda_for_config_rule():
"""Return the ARN of a lambda that can be used by a custom rule."""
role_name = "test-role"
lambda_role = None
with mock_iam():
iam_client = boto3.client("iam", region_name=TEST_REGION)
try:
lambda_role = iam_client.get_role(RoleName=role_name)["Role"]["Arn"]
except ClientError:
lambda_role = iam_client.create_role(
RoleName=role_name, AssumeRolePolicyDocument="test policy", Path="/",
)["Role"]["Arn"]
# Create the lambda function and identify its location.
lambda_client = boto3.client("lambda", region_name=TEST_REGION)
lambda_client.create_function(
FunctionName="test_config_rule",
Runtime="python3.8",
Role=lambda_role,
Handler="lambda_function.lambda_handler",
Code={"ZipFile": zipped_lambda_function()},
Description="Lambda test function for config rule",
Timeout=3,
MemorySize=128,
Publish=True,
)
@mock_config
def test_put_config_rule_errors():
"""Test various error conditions in put_config_rule API call."""
client = boto3.client("config", region_name=TEST_REGION)
rule_name_base = "cf_limit_test"
for idx in range(ConfigRule.MAX_RULES):
managed_rule = managed_config_rule()
managed_rule["ConfigRuleName"] = f"{rule_name_base}_{idx}"
client.put_config_rule(ConfigRule=managed_rule)
with pytest.raises(ClientError) as exc:
managed_rule = managed_config_rule()
managed_rule["ConfigRuleName"] = f"{rule_name_base}_{ConfigRule.MAX_RULES}"
client.put_config_rule(ConfigRule=managed_rule)
err = exc.value.response["Error"]
assert err["Code"] == "MaxNumberOfConfigRulesExceededException"
assert "maximum number of config rules" in err["Message"]
# Free up the memory from the limits test.
for idx in range(ConfigRule.MAX_RULES):
client.delete_config_rule(ConfigRuleName=f"{rule_name_base}_{idx}")
# Rule name that exceeds 128 chars in length.
rule_name = ascii_lowercase * 5
managed_rule = managed_config_rule()
managed_rule["ConfigRuleName"] = rule_name
with pytest.raises(ClientError) as exc:
client.put_config_rule(ConfigRule=managed_rule)
err = exc.value.response["Error"]
assert err["Code"] == "ValidationException"
assert "Member must have length less than or equal to 128" in err["Message"]
@mock_config
def test_put_config_rule_update_errors():
"""Test various error conditions when updating ConfigRule."""
client = boto3.client("config", region_name=TEST_REGION)
# No name, arn or id.
managed_rule = {
"Description": "Managed S3 Public Read Prohibited Bucket Rule",
"Scope": {"ComplianceResourceTypes": ["AWS::S3::Bucket"]},
"Source": {
"Owner": "AWS",
"SourceIdentifier": "S3_BUCKET_PUBLIC_READ_PROHIBITED",
},
}
with pytest.raises(ClientError) as exc:
client.put_config_rule(ConfigRule=managed_rule)
err = exc.value.response["Error"]
assert err["Code"] == "InvalidParameterValueException"
assert (
"One or more identifiers needs to be provided. Provide Name or Id or Arn"
in err["Message"]
)
# Provide an id for a rule that does not exist.
managed_rule = {
"ConfigRuleId": "foo",
"Description": "Managed S3 Public Read Prohibited Bucket Rule",
"Scope": {"ComplianceResourceTypes": ["AWS::S3::Bucket"]},
"Source": {
"Owner": "AWS",
"SourceIdentifier": "S3_BUCKET_PUBLIC_READ_PROHIBITED",
},
}
with pytest.raises(ClientError) as exc:
client.put_config_rule(ConfigRule=managed_rule)
err = exc.value.response["Error"]
assert err["Code"] == "InvalidParameterValueException"
assert (
"One or more identifiers needs to be provided. Provide Name or Id or Arn"
in err["Message"]
)
@mock_config
def test_config_rule_errors(): # pylint: disable=too-many-statements
"""Test various error conditions in ConfigRule instantiation."""
client = boto3.client("config", region_name=TEST_REGION)
# Missing fields (ParamValidationError) caught by botocore and not
# tested here: ConfigRule.Source, ConfigRule.ConfigRuleName
managed_rule = managed_config_rule()
managed_rule["ConfigRuleArn"] = "arn"
with pytest.raises(ClientError) as exc:
client.put_config_rule(ConfigRule=managed_rule)
err = exc.value.response["Error"]
assert err["Code"] == "InvalidParameterValueException"
assert (
"ConfigRule Arn and Id can not be specified when creating a new "
"ConfigRule." in err["Message"]
)
managed_rule = managed_config_rule()
bad_json_string = "{'name': 'test', 'type': null, }"
managed_rule["InputParameters"] = bad_json_string
with pytest.raises(ClientError) as exc:
client.put_config_rule(ConfigRule=managed_rule)
err = exc.value.response["Error"]
assert err["Code"] == "InvalidParameterValueException"
assert (
f"Invalid json {bad_json_string} passed in the InputParameters field"
in err["Message"]
)
managed_rule = managed_config_rule()
managed_rule["MaximumExecutionFrequency"] = "HOUR"
with pytest.raises(ClientError) as exc:
client.put_config_rule(ConfigRule=managed_rule)
err = exc.value.response["Error"]
assert err["Code"] == "ValidationException"
assert (
"Member must satisfy enum value set: {One_Hour, Six_Hours, "
"Three_Hours, Twelve_Hours, TwentyFour_Hours}" in err["Message"]
)
managed_rule = managed_config_rule()
managed_rule["ConfigRuleState"] = "BOGUS"
with pytest.raises(ClientError) as exc:
client.put_config_rule(ConfigRule=managed_rule)
err = exc.value.response["Error"]
assert err["Code"] == "ValidationException"
assert (
"Value 'BOGUS' at 'configRule.configRuleState' failed to satisfy "
"constraint: Member must satisfy enum value set: {ACTIVE, "
"DELETING, DELETING_RESULTS, EVALUATING}" in err["Message"]
)
managed_rule = managed_config_rule()
managed_rule["ConfigRuleState"] = "DELETING"
with pytest.raises(ClientError) as exc:
client.put_config_rule(ConfigRule=managed_rule)
err = exc.value.response["Error"]
assert err["Code"] == "InvalidParameterValueException"
assert (
"The ConfigRuleState DELETING is invalid. Only the following values "
"are permitted: ACTIVE" in err["Message"]
)
managed_rule = managed_config_rule()
managed_rule["CreatedBy"] = "tester"
with pytest.raises(ClientError) as exc:
client.put_config_rule(ConfigRule=managed_rule)
err = exc.value.response["Error"]
assert err["Code"] == "InvalidParameterValueException"
assert (
"AWS Config populates the CreatedBy field for ServiceLinkedConfigRule. "
"Try again without populating the CreatedBy field" in err["Message"]
)
@mock_config
def test_aws_managed_rule_errors():
"""Test various error conditions in ConfigRule instantiation."""
client = boto3.client("config", region_name=TEST_REGION)
# Extra, unknown input parameter should raise an error.
managed_rule = managed_config_rule()
managed_rule["Source"]["SourceIdentifier"] = "IAM_PASSWORD_POLICY"
managed_rule["InputParameters"] = '{"RequireNumbers":"true","Extra":"10"}'
with pytest.raises(ClientError) as exc:
client.put_config_rule(ConfigRule=managed_rule)
err = exc.value.response["Error"]
assert err["Code"] == "InvalidParameterValueException"
assert (
f"Unknown parameters provided in the inputParameters: "
f"{managed_rule['InputParameters']}" in err["Message"]
)
# Missing required parameters should raise an error.
managed_rule = managed_config_rule()
managed_rule["Source"]["SourceIdentifier"] = "CLOUDWATCH_ALARM_ACTION_CHECK"
with pytest.raises(ClientError) as exc:
client.put_config_rule(ConfigRule=managed_rule)
err = exc.value.response["Error"]
assert err["Code"] == "InvalidParameterValueException"
assert (
"The required parameter [alarmActionRequired, "
"insufficientDataActionRequired, okActionRequired] is not present "
"in the inputParameters" in err["Message"]
)
# If no MaxExecutionFrequency specified, set it to the default.
# rule_name = f"managed_rule_{random_string()}"
# managed_rule = {
# "ConfigRuleName": rule_name,
# "Description": "Managed S3 Public Read Prohibited Bucket Rule",
# "Scope": {"ComplianceResourceTypes": ["AWS::IAM::Group"]},
# "Source": {
# "Owner": "AWS",
# "SourceIdentifier": "IAM_PASSWORD_POLICY",
# },
# }
# client.put_config_rule(ConfigRule=managed_rule)
# rsp = client.describe_config_rules(ConfigRuleNames=[rule_name])
# new_config_rule = rsp["ConfigRules"][0]
# assert new_config_rule["MaximumExecutionFrequency"] == "TwentyFour_Hours"
@mock_config
def test_config_rules_scope_errors(): # pylint: disable=too-many-statements
"""Test various error conditions in ConfigRule.Scope instantiation."""
client = boto3.client("config", region_name=TEST_REGION)
managed_rule = managed_config_rule()
managed_rule["Scope"]["TagValue"] = "tester"
with pytest.raises(ClientError) as exc:
client.put_config_rule(ConfigRule=managed_rule)
err = exc.value.response["Error"]
assert err["Code"] == "InvalidParameterValueException"
assert (
"Tag key should not be empty when tag value is provided in scope"
in err["Message"]
)
managed_rule = managed_config_rule()
managed_rule["Scope"]["ComplianceResourceId"] = "12345"
with pytest.raises(ClientError) as exc:
client.put_config_rule(ConfigRule=managed_rule)
err = exc.value.response["Error"]
assert err["Code"] == "InvalidParameterValueException"
assert (
"A single resourceType should be provided when resourceId is provided "
"in scope" in err["Message"]
)
managed_rule = managed_config_rule()
tag_key = "hellobye" * 16 + "x"
managed_rule["Scope"]["TagKey"] = tag_key
with pytest.raises(ClientError) as exc:
client.put_config_rule(ConfigRule=managed_rule)
err = exc.value.response["Error"]
assert err["Code"] == "ValidationException"
assert (
f"Value '{tag_key}' at 'ConfigRule.Scope.TagKey' failed to satisfy "
f"constraint: Member must have length less than or equal to 128"
in err["Message"]
)
managed_rule = managed_config_rule()
managed_rule["Scope"]["TagKey"] = "test"
tag_value = "01234567890123456" * 16 + "x"
managed_rule["Scope"]["TagValue"] = tag_value
with pytest.raises(ClientError) as exc:
client.put_config_rule(ConfigRule=managed_rule)
err = exc.value.response["Error"]
assert err["Code"] == "ValidationException"
assert (
f"Value '{tag_value}' at 'ConfigRule.Scope.TagValue' failed to "
f"satisfy constraint: Member must have length less than or equal to "
f"256" in err["Message"]
)
managed_rule = managed_config_rule()
managed_rule["Scope"]["TagKey"] = "test"
with pytest.raises(ClientError) as exc:
client.put_config_rule(ConfigRule=managed_rule)
err = exc.value.response["Error"]
assert err["Code"] == "InvalidParameterValueException"
assert "Scope cannot be applied to both resource and tag" in err["Message"]
managed_rule = managed_config_rule()
managed_rule["Scope"]["TagKey"] = "test_key"
managed_rule["Scope"]["ComplianceResourceTypes"] = []
managed_rule["Scope"]["ComplianceResourceId"] = "12345"
with pytest.raises(ClientError) as exc:
client.put_config_rule(ConfigRule=managed_rule)
err = exc.value.response["Error"]
assert err["Code"] == "InvalidParameterValueException"
assert "Scope cannot be applied to both resource and tag" in err["Message"]
@mock_config
def test_config_rules_source_errors(): # pylint: disable=too-many-statements
"""Test various error conditions in ConfigRule.Source instantiation."""
client = boto3.client("config", region_name=TEST_REGION)
# Missing fields (ParamValidationError) caught by botocore and not
# tested here: ConfigRule.Source.SourceIdentifier
managed_rule = managed_config_rule()
managed_rule["Source"]["Owner"] = "test"
with pytest.raises(ClientError) as exc:
client.put_config_rule(ConfigRule=managed_rule)
err = exc.value.response["Error"]
assert err["Code"] == "ValidationException"
assert "Member must satisfy enum value set: {AWS, CUSTOM_LAMBDA}" in err["Message"]
managed_rule = managed_config_rule()
managed_rule["Source"]["SourceIdentifier"] = "test"
with pytest.raises(ClientError) as exc:
client.put_config_rule(ConfigRule=managed_rule)
err = exc.value.response["Error"]
assert err["Code"] == "InvalidParameterValueException"
assert (
"The sourceIdentifier test is invalid. Please refer to the "
"documentation for a list of valid sourceIdentifiers that can be used "
"when AWS is the Owner" in err["Message"]
)
managed_rule = managed_config_rule()
managed_rule["Source"]["SourceDetails"] = [
{"EventSource": "aws.config", "MessageType": "ScheduledNotification"}
]
with pytest.raises(ClientError) as exc:
client.put_config_rule(ConfigRule=managed_rule)
err = exc.value.response["Error"]
assert err["Code"] == "InvalidParameterValueException"
assert (
"SourceDetails should be null/empty if the owner is AWS. "
"SourceDetails should be provided if the owner is CUSTOM_LAMBDA"
in err["Message"]
)
custom_rule = custom_config_rule()
custom_rule["Source"] = {
"Owner": "CUSTOM_LAMBDA",
"SourceIdentifier": "test",
}
with pytest.raises(ClientError) as exc:
client.put_config_rule(ConfigRule=custom_rule)
err = exc.value.response["Error"]
assert err["Code"] == "InvalidParameterValueException"
assert (
"SourceDetails should be null/empty if the owner is AWS. "
"SourceDetails should be provided if the owner is CUSTOM_LAMBDA"
in err["Message"]
)
custom_rule = custom_config_rule()
with pytest.raises(ClientError) as exc:
client.put_config_rule(ConfigRule=custom_rule)
err = exc.value.response["Error"]
assert err["Code"] == "InsufficientPermissionsException"
assert (
f'The AWS Lambda function {custom_rule["Source"]["SourceIdentifier"]} '
f"cannot be invoked. Check the specified function ARN, and check the "
f"function's permissions" in err["Message"]
)
@mock_config
def test_config_rules_source_details_errors():
"""Test error conditions with ConfigRule.Source_Details instantiation."""
client = boto3.client("config", region_name=TEST_REGION)
create_lambda_for_config_rule()
custom_rule = custom_config_rule()
custom_rule["Source"]["SourceDetails"][0] = {"MessageType": "ScheduledNotification"}
with pytest.raises(ClientError) as exc:
client.put_config_rule(ConfigRule=custom_rule)
err = exc.value.response["Error"]
assert err["Code"] == "ParamValidationError"
assert (
"Missing required parameter in ConfigRule.SourceDetails: 'EventSource'"
in err["Message"]
)
custom_rule = custom_config_rule()
custom_rule["Source"]["SourceDetails"][0]["EventSource"] = "foo"
with pytest.raises(ClientError) as exc:
client.put_config_rule(ConfigRule=custom_rule)
err = exc.value.response["Error"]
assert err["Code"] == "ValidationException"
assert "Member must satisfy enum value set: {aws.config}" in err["Message"]
custom_rule = custom_config_rule()
custom_rule["Source"]["SourceDetails"][0] = {"EventSource": "aws.config"}
with pytest.raises(ClientError) as exc:
client.put_config_rule(ConfigRule=custom_rule)
err = exc.value.response["Error"]
assert err["Code"] == "ParamValidationError"
assert (
"Missing required parameter in ConfigRule.SourceDetails: 'MessageType'"
in err["Message"]
)
custom_rule = custom_config_rule()
custom_rule["Source"]["SourceDetails"][0] = {
"MessageType": "foo",
"EventSource": "aws.config",
}
with pytest.raises(ClientError) as exc:
client.put_config_rule(ConfigRule=custom_rule)
err = exc.value.response["Error"]
assert err["Code"] == "ValidationException"
assert (
"Member must satisfy enum value set: "
"{ConfigurationItemChangeNotification, "
"ConfigurationSnapshotDeliveryCompleted, "
"OversizedConfigurationItemChangeNotification, ScheduledNotification}"
in err["Message"]
)
custom_rule = custom_config_rule()
custom_rule["Source"]["SourceDetails"][0]["MaximumExecutionFrequency"] = "foo"
with pytest.raises(ClientError) as exc:
client.put_config_rule(ConfigRule=custom_rule)
err = exc.value.response["Error"]
assert err["Code"] == "ValidationException"
assert (
"Member must satisfy enum value set: "
"{One_Hour, Six_Hours, Three_Hours, Twelve_Hours, TwentyFour_Hours}"
in err["Message"]
)
custom_rule = custom_config_rule()
custom_rule["Source"]["SourceDetails"][0][
"MessageType"
] = "ConfigurationItemChangeNotification"
with pytest.raises(ClientError) as exc:
client.put_config_rule(ConfigRule=custom_rule)
err = exc.value.response["Error"]
assert err["Code"] == "InvalidParameterValueException"
assert (
"A maximum execution frequency is not allowed if MessageType "
"is ConfigurationItemChangeNotification or "
"OversizedConfigurationItemChangeNotification" in err["Message"]
)
@mock_config
def test_valid_put_config_managed_rule():
"""Test valid put_config_rule API calls for managed rules."""
client = boto3.client("config", region_name=TEST_REGION)
# Create managed rule and compare input against describe_config_rule()
# output.
managed_rule = managed_config_rule()
managed_rule["Scope"]["ComplianceResourceTypes"] = ["AWS::IAM::Group"]
managed_rule["Scope"]["ComplianceResourceId"] = "basic_test"
managed_rule["InputParameters"] = '{"RequireUppercaseCharacters":"true"}'
managed_rule["ConfigRuleState"] = "ACTIVE"
client.put_config_rule(ConfigRule=managed_rule)
rsp = client.describe_config_rules(ConfigRuleNames=[managed_rule["ConfigRuleName"]])
managed_rule_json = json.dumps(managed_rule, sort_keys=True)
new_config_rule = rsp["ConfigRules"][0]
rule_arn = new_config_rule.pop("ConfigRuleArn")
rule_id = new_config_rule.pop("ConfigRuleId")
rsp_json = json.dumps(new_config_rule, sort_keys=True)
assert managed_rule_json == rsp_json
# Update managed rule and compare again.
managed_rule["ConfigRuleArn"] = rule_arn
managed_rule["ConfigRuleId"] = rule_id
managed_rule["Description"] = "Updated Managed S3 Public Read Rule"
managed_rule["Scope"]["ComplianceResourceTypes"] = ["AWS::S3::Bucket"]
managed_rule["Scope"]["ComplianceResourceId"] = "S3-BUCKET_VERSIONING_ENABLED"
managed_rule["MaximumExecutionFrequency"] = "Six_Hours"
managed_rule["InputParameters"] = "{}"
client.put_config_rule(ConfigRule=managed_rule)
rsp = client.describe_config_rules(ConfigRuleNames=[managed_rule["ConfigRuleName"]])
managed_rule_json = json.dumps(managed_rule, sort_keys=True)
rsp_json = json.dumps(rsp["ConfigRules"][0], sort_keys=True)
assert managed_rule_json == rsp_json
@mock_config
def test_valid_put_config_custom_rule():
"""Test valid put_config_rule API calls for custom rules."""
client = boto3.client("config", region_name=TEST_REGION)
# Create custom rule and compare input against describe_config_rule
# output.
create_lambda_for_config_rule()
custom_rule = custom_config_rule()
custom_rule["Scope"]["ComplianceResourceTypes"] = ["AWS::IAM::Group"]
custom_rule["Scope"]["ComplianceResourceId"] = "basic_custom_test"
custom_rule["InputParameters"] = '{"TestName":"true"}'
custom_rule["ConfigRuleState"] = "ACTIVE"
client.put_config_rule(ConfigRule=custom_rule)
rsp = client.describe_config_rules(ConfigRuleNames=[custom_rule["ConfigRuleName"]])
custom_rule_json = json.dumps(custom_rule, sort_keys=True)
new_config_rule = rsp["ConfigRules"][0]
rule_arn = new_config_rule.pop("ConfigRuleArn")
rule_id = new_config_rule.pop("ConfigRuleId")
rsp_json = json.dumps(new_config_rule, sort_keys=True)
assert custom_rule_json == rsp_json
# Update custom rule and compare again.
custom_rule["ConfigRuleArn"] = rule_arn
custom_rule["ConfigRuleId"] = rule_id
custom_rule["Description"] = "Updated Managed S3 Public Read Rule"
custom_rule["Scope"]["ComplianceResourceTypes"] = ["AWS::S3::Bucket"]
custom_rule["Scope"]["ComplianceResourceId"] = "S3-BUCKET_VERSIONING_ENABLED"
custom_rule["Source"]["SourceDetails"][0] = {
"EventSource": "aws.config",
"MessageType": "ConfigurationItemChangeNotification",
}
custom_rule["InputParameters"] = "{}"
client.put_config_rule(ConfigRule=custom_rule)
rsp = client.describe_config_rules(ConfigRuleNames=[custom_rule["ConfigRuleName"]])
custom_rule_json = json.dumps(custom_rule, sort_keys=True)
rsp_json = json.dumps(rsp["ConfigRules"][0], sort_keys=True)
assert custom_rule_json == rsp_json
# Update a custom rule specifying just the rule Id. Test the default
# value for MaximumExecutionFrequency while we're at it.
del custom_rule["ConfigRuleArn"]
rule_name = custom_rule.pop("ConfigRuleName")
custom_rule["Source"]["SourceDetails"][0] = {
"EventSource": "aws.config",
"MessageType": "ConfigurationSnapshotDeliveryCompleted",
}
client.put_config_rule(ConfigRule=custom_rule)
rsp = client.describe_config_rules(ConfigRuleNames=[rule_name])
updated_rule = rsp["ConfigRules"][0]
assert updated_rule["ConfigRuleName"] == rule_name
assert (
updated_rule["Source"]["SourceDetails"][0]["MaximumExecutionFrequency"]
== "TwentyFour_Hours"
)
# Update a custom rule specifying just the rule ARN.
custom_rule["ConfigRuleArn"] = rule_arn
del custom_rule["ConfigRuleId"]
custom_rule["MaximumExecutionFrequency"] = "Six_Hours"
client.put_config_rule(ConfigRule=custom_rule)
rsp = client.describe_config_rules(ConfigRuleNames=[rule_name])
updated_rule = rsp["ConfigRules"][0]
assert updated_rule["ConfigRuleName"] == rule_name
assert updated_rule["MaximumExecutionFrequency"] == "Six_Hours"
@mock_config
def test_describe_config_rules():
"""Test the describe_config_rules API."""
client = boto3.client("config", region_name=TEST_REGION)
response = client.describe_config_rules()
assert len(response["ConfigRules"]) == 0
rule_name_base = "describe_test"
for idx in range(ConfigRule.MAX_RULES):
managed_rule = managed_config_rule()
managed_rule["ConfigRuleName"] = f"{rule_name_base}_{idx}"
client.put_config_rule(ConfigRule=managed_rule)
with pytest.raises(ClientError) as exc:
client.describe_config_rules(
ConfigRuleNames=[
f"{rule_name_base}_1",
f"{rule_name_base}_10",
"fooey",
f"{rule_name_base}_20",
]
)
err = exc.value.response["Error"]
assert err["Code"] == "NoSuchConfigRuleException"
assert "The ConfigRule 'fooey' provided in the request is invalid" in err["Message"]
# Request three specific ConfigRules.
response = client.describe_config_rules(
ConfigRuleNames=[
f"{rule_name_base}_1",
f"{rule_name_base}_10",
f"{rule_name_base}_20",
]
)
assert len(response["ConfigRules"]) == 3
# By default, if no ConfigRules are specified, all that can be fit on a
# "page" will be returned.
response = client.describe_config_rules()
assert len(response["ConfigRules"]) == CONFIG_RULE_PAGE_SIZE
# Test a bad token.
with pytest.raises(ClientError) as exc:
client.describe_config_rules(NextToken="foo")
err = exc.value.response["Error"]
assert err["Code"] == "InvalidNextTokenException"
assert "The nextToken provided is invalid" in err["Message"]
# Loop using tokens, verifying the tokens are as expected.
expected_tokens = [ # Non-alphanumeric sorted token numbers
f"{rule_name_base}_120",
f"{rule_name_base}_143",
f"{rule_name_base}_31",
f"{rule_name_base}_54",
f"{rule_name_base}_77",
None,
]
idx = 0
token = f"{rule_name_base}_0"
while token:
rsp = client.describe_config_rules(NextToken=token)
token = rsp.get("NextToken")
assert token == expected_tokens[idx]
idx += 1
@mock_config
def test_delete_config_rules():
"""Test the delete_config_rule API."""
client = boto3.client("config", region_name=TEST_REGION)
# Create a ConfigRule:
rule_name = "test_delete_config_rule"
managed_rule = managed_config_rule()
managed_rule["ConfigRuleName"] = rule_name
client.put_config_rule(ConfigRule=managed_rule)
# Delete it:
client.delete_config_rule(ConfigRuleName=rule_name)
# Verify that none are there:
assert not client.describe_config_rules()["ConfigRules"]
# Try it again -- it should error indicating the rule could not be found.
with pytest.raises(ClientError) as exc:
client.delete_config_rule(ConfigRuleName=rule_name)
err = exc.value.response["Error"]
assert err["Code"] == "NoSuchConfigRuleException"
assert (
f"The ConfigRule '{rule_name}' provided in the request is invalid"
in err["Message"]
)

View File

@ -125,7 +125,28 @@ def test_tag_resource():
updated_rsp = client.list_tags_for_resource(ResourceArn=agg_auth_arn)
assert tags == updated_rsp["Tags"]
# Verify keys added to ConfigRule, when implemented.
# Verify keys added to ConfigRule.
config_rule_name = f"config-rule-test-{random_string()}"
client.put_config_rule(
ConfigRule={
"ConfigRuleName": config_rule_name,
"Scope": {"ComplianceResourceTypes": ["AWS::IAM::Group"]},
"Source": {"Owner": "AWS", "SourceIdentifier": "IAM_PASSWORD_POLICY"},
},
Tags=[{"Key": f"{x}", "Value": f"{x}"} for x in range(10)],
)
config_rule = client.describe_config_rules(ConfigRuleNames=[config_rule_name])[
"ConfigRules"
][0]
config_rule_arn = config_rule["ConfigRuleArn"]
rsp = client.list_tags_for_resource(ResourceArn=config_rule_arn)
tags = rsp["Tags"]
client.tag_resource(ResourceArn=config_rule_arn, Tags=new_tags)
tags.extend(new_tags)
updated_rsp = client.list_tags_for_resource(ResourceArn=config_rule_arn)
assert tags == updated_rsp["Tags"]
@mock_config
@ -225,7 +246,28 @@ def test_untag_resource():
updated_rsp = client.list_tags_for_resource(ResourceArn=good_arn)
assert not updated_rsp["Tags"]
# Verify keys removed from ConfigRule, when implemented.
# Verify keys removed from ConfigRule. Add a new tag to the current set
# of tags, then delete the new tag. The original set of tags should remain.
rule_name = f"config-rule-delete-tags-test-{random_string()}"
client.put_config_rule(
ConfigRule={
"ConfigRuleName": rule_name,
"Scope": {"ComplianceResourceTypes": ["AWS::IAM::Group"]},
"Source": {"Owner": "AWS", "SourceIdentifier": "IAM_PASSWORD_POLICY"},
},
Tags=[{"Key": f"{x}", "Value": f"{x}"} for x in range(10)],
)
config_rule_arn = client.describe_config_rules(ConfigRuleNames=[rule_name])[
"ConfigRules"
][0]["ConfigRuleArn"]
tags = client.list_tags_for_resource(ResourceArn=config_rule_arn)["Tags"]
test_tags = [{"Key": "cr_test_key", "Value": "cr_test_value"}]
client.tag_resource(ResourceArn=config_rule_arn, Tags=test_tags)
client.untag_resource(ResourceArn=config_rule_arn, TagKeys=[test_tags[0]["Key"]])
updated_rsp = client.list_tags_for_resource(ResourceArn=config_rule_arn)
assert tags == updated_rsp["Tags"]
@mock_config