moto/tests/test_config/test_config_rules_integration.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

295 lines
11 KiB
Python
Raw Permalink Normal View History

import json
from io import BytesIO
from uuid import uuid4
from zipfile import ZIP_DEFLATED, ZipFile
import boto3
import pytest
from botocore.exceptions import ClientError
2024-01-07 12:03:33 +00:00
from moto import mock_aws
from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID
from .test_config_rules import TEST_REGION, managed_config_rule
def custom_config_rule(func_name="test_config_rule"):
"""Return a valid custom AWS Config Rule."""
return {
"ConfigRuleName": f"custom_rule_{uuid4()}[:6]",
"Description": "Custom S3 Public Read Prohibited Bucket Rule",
"Scope": {"ComplianceResourceTypes": ["AWS::S3::Bucket", "AWS::IAM::Group"]},
"Source": {
"Owner": "CUSTOM_LAMBDA",
"SourceIdentifier": f"arn:aws:lambda:{TEST_REGION}:{ACCOUNT_ID}:function:{func_name}",
"SourceDetails": [
{
"EventSource": "aws.config",
"MessageType": "ScheduledNotification",
"MaximumExecutionFrequency": "Three_Hours",
},
],
},
"MaximumExecutionFrequency": "One_Hour",
}
def zipped_lambda_function():
"""Return a simple test lambda function, zipped."""
func_str = """
def lambda_handler(event, context):
print("testing")
return event
"""
zip_output = BytesIO()
with ZipFile(zip_output, "w", ZIP_DEFLATED) as zip_file:
zip_file.writestr("lambda_function.py", func_str)
zip_file.close()
zip_output.seek(0)
return zip_output.read()
def create_lambda_for_config_rule():
"""Return the ARN of a lambda that can be used by a custom rule."""
role_name = "test-role"
lambda_role = None
2024-01-07 12:03:33 +00:00
iam_client = boto3.client("iam", region_name=TEST_REGION)
try:
lambda_role = iam_client.get_role(RoleName=role_name)["Role"]["Arn"]
except ClientError:
lambda_role = iam_client.create_role(
RoleName=role_name, AssumeRolePolicyDocument="test policy", Path="/"
)["Role"]["Arn"]
# Create the lambda function and identify its location.
lambda_client = boto3.client("lambda", region_name=TEST_REGION)
lambda_client.create_function(
FunctionName="test_config_rule",
Runtime="python3.11",
Role=lambda_role,
Handler="lambda_function.lambda_handler",
Code={"ZipFile": zipped_lambda_function()},
Description="Lambda test function for config rule",
Timeout=3,
MemorySize=128,
Publish=True,
)
2024-01-07 12:03:33 +00:00
@mock_aws
def test_config_rules_source_details_errors():
"""Test error conditions with ConfigRule.Source_Details instantiation."""
client = boto3.client("config", region_name=TEST_REGION)
create_lambda_for_config_rule()
custom_rule = custom_config_rule()
custom_rule["Source"]["SourceDetails"][0] = {"MessageType": "ScheduledNotification"}
with pytest.raises(ClientError) as exc:
client.put_config_rule(ConfigRule=custom_rule)
err = exc.value.response["Error"]
assert err["Code"] == "ParamValidationError"
assert (
"Missing required parameter in ConfigRule.SourceDetails: 'EventSource'"
in err["Message"]
)
custom_rule = custom_config_rule()
custom_rule["Source"]["SourceDetails"][0]["EventSource"] = "foo"
with pytest.raises(ClientError) as exc:
client.put_config_rule(ConfigRule=custom_rule)
err = exc.value.response["Error"]
assert err["Code"] == "ValidationException"
assert "Member must satisfy enum value set: {aws.config}" in err["Message"]
custom_rule = custom_config_rule()
custom_rule["Source"]["SourceDetails"][0] = {"EventSource": "aws.config"}
with pytest.raises(ClientError) as exc:
client.put_config_rule(ConfigRule=custom_rule)
err = exc.value.response["Error"]
assert err["Code"] == "ParamValidationError"
assert (
"Missing required parameter in ConfigRule.SourceDetails: 'MessageType'"
in err["Message"]
)
custom_rule = custom_config_rule()
custom_rule["Source"]["SourceDetails"][0] = {
"MessageType": "foo",
"EventSource": "aws.config",
}
with pytest.raises(ClientError) as exc:
client.put_config_rule(ConfigRule=custom_rule)
err = exc.value.response["Error"]
assert err["Code"] == "ValidationException"
assert (
"Member must satisfy enum value set: "
"{ConfigurationItemChangeNotification, "
"ConfigurationSnapshotDeliveryCompleted, "
"OversizedConfigurationItemChangeNotification, ScheduledNotification}"
in err["Message"]
)
custom_rule = custom_config_rule()
custom_rule["Source"]["SourceDetails"][0]["MaximumExecutionFrequency"] = "foo"
with pytest.raises(ClientError) as exc:
client.put_config_rule(ConfigRule=custom_rule)
err = exc.value.response["Error"]
assert err["Code"] == "ValidationException"
assert (
"Member must satisfy enum value set: "
"{One_Hour, Six_Hours, Three_Hours, Twelve_Hours, TwentyFour_Hours}"
in err["Message"]
)
custom_rule = custom_config_rule()
custom_rule["Source"]["SourceDetails"][0]["MessageType"] = (
"ConfigurationItemChangeNotification"
)
with pytest.raises(ClientError) as exc:
client.put_config_rule(ConfigRule=custom_rule)
err = exc.value.response["Error"]
assert err["Code"] == "InvalidParameterValueException"
assert (
"A maximum execution frequency is not allowed if MessageType "
"is ConfigurationItemChangeNotification or "
"OversizedConfigurationItemChangeNotification" in err["Message"]
)
2024-01-07 12:03:33 +00:00
@mock_aws
def test_valid_put_config_custom_rule():
"""Test valid put_config_rule API calls for custom rules."""
client = boto3.client("config", region_name=TEST_REGION)
# Create custom rule and compare input against describe_config_rule
# output.
create_lambda_for_config_rule()
custom_rule = custom_config_rule()
custom_rule["Scope"]["ComplianceResourceTypes"] = ["AWS::IAM::Group"]
custom_rule["Scope"]["ComplianceResourceId"] = "basic_custom_test"
custom_rule["InputParameters"] = '{"TestName":"true"}'
custom_rule["ConfigRuleState"] = "ACTIVE"
client.put_config_rule(ConfigRule=custom_rule)
rsp = client.describe_config_rules(ConfigRuleNames=[custom_rule["ConfigRuleName"]])
custom_rule_json = json.dumps(custom_rule, sort_keys=True)
new_config_rule = rsp["ConfigRules"][0]
rule_arn = new_config_rule.pop("ConfigRuleArn")
rule_id = new_config_rule.pop("ConfigRuleId")
rsp_json = json.dumps(new_config_rule, sort_keys=True)
assert custom_rule_json == rsp_json
# Update custom rule and compare again.
custom_rule["ConfigRuleArn"] = rule_arn
custom_rule["ConfigRuleId"] = rule_id
custom_rule["Description"] = "Updated Managed S3 Public Read Rule"
custom_rule["Scope"]["ComplianceResourceTypes"] = ["AWS::S3::Bucket"]
custom_rule["Scope"]["ComplianceResourceId"] = "S3-BUCKET_VERSIONING_ENABLED"
custom_rule["Source"]["SourceDetails"][0] = {
"EventSource": "aws.config",
"MessageType": "ConfigurationItemChangeNotification",
}
custom_rule["InputParameters"] = "{}"
client.put_config_rule(ConfigRule=custom_rule)
rsp = client.describe_config_rules(ConfigRuleNames=[custom_rule["ConfigRuleName"]])
custom_rule_json = json.dumps(custom_rule, sort_keys=True)
rsp_json = json.dumps(rsp["ConfigRules"][0], sort_keys=True)
assert custom_rule_json == rsp_json
# Update a custom rule specifying just the rule Id. Test the default
# value for MaximumExecutionFrequency while we're at it.
del custom_rule["ConfigRuleArn"]
rule_name = custom_rule.pop("ConfigRuleName")
custom_rule["Source"]["SourceDetails"][0] = {
"EventSource": "aws.config",
"MessageType": "ConfigurationSnapshotDeliveryCompleted",
}
client.put_config_rule(ConfigRule=custom_rule)
rsp = client.describe_config_rules(ConfigRuleNames=[rule_name])
updated_rule = rsp["ConfigRules"][0]
assert updated_rule["ConfigRuleName"] == rule_name
assert (
updated_rule["Source"]["SourceDetails"][0]["MaximumExecutionFrequency"]
== "TwentyFour_Hours"
)
# Update a custom rule specifying just the rule ARN.
custom_rule["ConfigRuleArn"] = rule_arn
del custom_rule["ConfigRuleId"]
custom_rule["MaximumExecutionFrequency"] = "Six_Hours"
client.put_config_rule(ConfigRule=custom_rule)
rsp = client.describe_config_rules(ConfigRuleNames=[rule_name])
updated_rule = rsp["ConfigRules"][0]
assert updated_rule["ConfigRuleName"] == rule_name
assert updated_rule["MaximumExecutionFrequency"] == "Six_Hours"
2024-01-07 12:03:33 +00:00
@mock_aws
def test_config_rules_source_errors(): # pylint: disable=too-many-statements
"""Test various error conditions in ConfigRule.Source instantiation."""
client = boto3.client("config", region_name=TEST_REGION)
# Missing fields (ParamValidationError) caught by botocore and not
# tested here: ConfigRule.Source.SourceIdentifier
managed_rule = managed_config_rule()
managed_rule["Source"]["Owner"] = "test"
with pytest.raises(ClientError) as exc:
client.put_config_rule(ConfigRule=managed_rule)
err = exc.value.response["Error"]
assert err["Code"] == "ValidationException"
assert "Member must satisfy enum value set: {AWS, CUSTOM_LAMBDA}" in err["Message"]
managed_rule = managed_config_rule()
managed_rule["Source"]["SourceIdentifier"] = "test"
with pytest.raises(ClientError) as exc:
client.put_config_rule(ConfigRule=managed_rule)
err = exc.value.response["Error"]
assert err["Code"] == "InvalidParameterValueException"
assert (
"The sourceIdentifier test is invalid. Please refer to the "
"documentation for a list of valid sourceIdentifiers that can be used "
"when AWS is the Owner" in err["Message"]
)
managed_rule = managed_config_rule()
managed_rule["Source"]["SourceDetails"] = [
{"EventSource": "aws.config", "MessageType": "ScheduledNotification"}
]
with pytest.raises(ClientError) as exc:
client.put_config_rule(ConfigRule=managed_rule)
err = exc.value.response["Error"]
assert err["Code"] == "InvalidParameterValueException"
assert (
"SourceDetails should be null/empty if the owner is AWS. "
"SourceDetails should be provided if the owner is CUSTOM_LAMBDA"
in err["Message"]
)
custom_rule = custom_config_rule()
custom_rule["Source"] = {
"Owner": "CUSTOM_LAMBDA",
"SourceIdentifier": "test",
}
with pytest.raises(ClientError) as exc:
client.put_config_rule(ConfigRule=custom_rule)
err = exc.value.response["Error"]
assert err["Code"] == "InvalidParameterValueException"
assert (
"SourceDetails should be null/empty if the owner is AWS. "
"SourceDetails should be provided if the owner is CUSTOM_LAMBDA"
in err["Message"]
)
custom_rule = custom_config_rule(func_name="unknown_func")
with pytest.raises(ClientError) as exc:
client.put_config_rule(ConfigRule=custom_rule)
err = exc.value.response["Error"]
assert err["Code"] == "InsufficientPermissionsException"
assert (
f'The AWS Lambda function {custom_rule["Source"]["SourceIdentifier"]} '
f"cannot be invoked. Check the specified function ARN, and check the "
f"function's permissions" in err["Message"]
)