295 lines
11 KiB
Python
295 lines
11 KiB
Python
from .test_config_rules import managed_config_rule, TEST_REGION
|
|
from botocore.exceptions import ClientError
|
|
from moto import mock_config, mock_iam, mock_lambda
|
|
from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID
|
|
from io import BytesIO
|
|
from uuid import uuid4
|
|
from zipfile import ZipFile, ZIP_DEFLATED
|
|
|
|
import boto3
|
|
import json
|
|
import pytest
|
|
|
|
|
|
def custom_config_rule(func_name="test_config_rule"):
|
|
"""Return a valid custom AWS Config Rule."""
|
|
return {
|
|
"ConfigRuleName": f"custom_rule_{uuid4()}[:6]",
|
|
"Description": "Custom S3 Public Read Prohibited Bucket Rule",
|
|
"Scope": {"ComplianceResourceTypes": ["AWS::S3::Bucket", "AWS::IAM::Group"]},
|
|
"Source": {
|
|
"Owner": "CUSTOM_LAMBDA",
|
|
"SourceIdentifier": f"arn:aws:lambda:{TEST_REGION}:{ACCOUNT_ID}:function:{func_name}",
|
|
"SourceDetails": [
|
|
{
|
|
"EventSource": "aws.config",
|
|
"MessageType": "ScheduledNotification",
|
|
"MaximumExecutionFrequency": "Three_Hours",
|
|
},
|
|
],
|
|
},
|
|
"MaximumExecutionFrequency": "One_Hour",
|
|
}
|
|
|
|
|
|
def zipped_lambda_function():
|
|
"""Return a simple test lambda function, zipped."""
|
|
func_str = """
|
|
def lambda_handler(event, context):
|
|
print("testing")
|
|
return event
|
|
"""
|
|
zip_output = BytesIO()
|
|
with ZipFile(zip_output, "w", ZIP_DEFLATED) as zip_file:
|
|
zip_file.writestr("lambda_function.py", func_str)
|
|
zip_file.close()
|
|
zip_output.seek(0)
|
|
return zip_output.read()
|
|
|
|
|
|
@mock_lambda
|
|
def create_lambda_for_config_rule():
|
|
"""Return the ARN of a lambda that can be used by a custom rule."""
|
|
role_name = "test-role"
|
|
lambda_role = None
|
|
with mock_iam():
|
|
iam_client = boto3.client("iam", region_name=TEST_REGION)
|
|
try:
|
|
lambda_role = iam_client.get_role(RoleName=role_name)["Role"]["Arn"]
|
|
except ClientError:
|
|
lambda_role = iam_client.create_role(
|
|
RoleName=role_name, AssumeRolePolicyDocument="test policy", Path="/"
|
|
)["Role"]["Arn"]
|
|
|
|
# Create the lambda function and identify its location.
|
|
lambda_client = boto3.client("lambda", region_name=TEST_REGION)
|
|
lambda_client.create_function(
|
|
FunctionName="test_config_rule",
|
|
Runtime="python3.11",
|
|
Role=lambda_role,
|
|
Handler="lambda_function.lambda_handler",
|
|
Code={"ZipFile": zipped_lambda_function()},
|
|
Description="Lambda test function for config rule",
|
|
Timeout=3,
|
|
MemorySize=128,
|
|
Publish=True,
|
|
)
|
|
|
|
|
|
@mock_config
|
|
def test_config_rules_source_details_errors():
|
|
"""Test error conditions with ConfigRule.Source_Details instantiation."""
|
|
client = boto3.client("config", region_name=TEST_REGION)
|
|
|
|
create_lambda_for_config_rule()
|
|
|
|
custom_rule = custom_config_rule()
|
|
custom_rule["Source"]["SourceDetails"][0] = {"MessageType": "ScheduledNotification"}
|
|
with pytest.raises(ClientError) as exc:
|
|
client.put_config_rule(ConfigRule=custom_rule)
|
|
err = exc.value.response["Error"]
|
|
assert err["Code"] == "ParamValidationError"
|
|
assert (
|
|
"Missing required parameter in ConfigRule.SourceDetails: 'EventSource'"
|
|
in err["Message"]
|
|
)
|
|
|
|
custom_rule = custom_config_rule()
|
|
custom_rule["Source"]["SourceDetails"][0]["EventSource"] = "foo"
|
|
with pytest.raises(ClientError) as exc:
|
|
client.put_config_rule(ConfigRule=custom_rule)
|
|
err = exc.value.response["Error"]
|
|
assert err["Code"] == "ValidationException"
|
|
assert "Member must satisfy enum value set: {aws.config}" in err["Message"]
|
|
|
|
custom_rule = custom_config_rule()
|
|
custom_rule["Source"]["SourceDetails"][0] = {"EventSource": "aws.config"}
|
|
with pytest.raises(ClientError) as exc:
|
|
client.put_config_rule(ConfigRule=custom_rule)
|
|
err = exc.value.response["Error"]
|
|
assert err["Code"] == "ParamValidationError"
|
|
assert (
|
|
"Missing required parameter in ConfigRule.SourceDetails: 'MessageType'"
|
|
in err["Message"]
|
|
)
|
|
|
|
custom_rule = custom_config_rule()
|
|
custom_rule["Source"]["SourceDetails"][0] = {
|
|
"MessageType": "foo",
|
|
"EventSource": "aws.config",
|
|
}
|
|
with pytest.raises(ClientError) as exc:
|
|
client.put_config_rule(ConfigRule=custom_rule)
|
|
err = exc.value.response["Error"]
|
|
assert err["Code"] == "ValidationException"
|
|
assert (
|
|
"Member must satisfy enum value set: "
|
|
"{ConfigurationItemChangeNotification, "
|
|
"ConfigurationSnapshotDeliveryCompleted, "
|
|
"OversizedConfigurationItemChangeNotification, ScheduledNotification}"
|
|
in err["Message"]
|
|
)
|
|
|
|
custom_rule = custom_config_rule()
|
|
custom_rule["Source"]["SourceDetails"][0]["MaximumExecutionFrequency"] = "foo"
|
|
with pytest.raises(ClientError) as exc:
|
|
client.put_config_rule(ConfigRule=custom_rule)
|
|
err = exc.value.response["Error"]
|
|
assert err["Code"] == "ValidationException"
|
|
assert (
|
|
"Member must satisfy enum value set: "
|
|
"{One_Hour, Six_Hours, Three_Hours, Twelve_Hours, TwentyFour_Hours}"
|
|
in err["Message"]
|
|
)
|
|
|
|
custom_rule = custom_config_rule()
|
|
custom_rule["Source"]["SourceDetails"][0][
|
|
"MessageType"
|
|
] = "ConfigurationItemChangeNotification"
|
|
with pytest.raises(ClientError) as exc:
|
|
client.put_config_rule(ConfigRule=custom_rule)
|
|
err = exc.value.response["Error"]
|
|
assert err["Code"] == "InvalidParameterValueException"
|
|
assert (
|
|
"A maximum execution frequency is not allowed if MessageType "
|
|
"is ConfigurationItemChangeNotification or "
|
|
"OversizedConfigurationItemChangeNotification" in err["Message"]
|
|
)
|
|
|
|
|
|
@mock_config
|
|
def test_valid_put_config_custom_rule():
|
|
"""Test valid put_config_rule API calls for custom rules."""
|
|
client = boto3.client("config", region_name=TEST_REGION)
|
|
# Create custom rule and compare input against describe_config_rule
|
|
# output.
|
|
create_lambda_for_config_rule()
|
|
custom_rule = custom_config_rule()
|
|
custom_rule["Scope"]["ComplianceResourceTypes"] = ["AWS::IAM::Group"]
|
|
custom_rule["Scope"]["ComplianceResourceId"] = "basic_custom_test"
|
|
custom_rule["InputParameters"] = '{"TestName":"true"}'
|
|
custom_rule["ConfigRuleState"] = "ACTIVE"
|
|
client.put_config_rule(ConfigRule=custom_rule)
|
|
|
|
rsp = client.describe_config_rules(ConfigRuleNames=[custom_rule["ConfigRuleName"]])
|
|
custom_rule_json = json.dumps(custom_rule, sort_keys=True)
|
|
new_config_rule = rsp["ConfigRules"][0]
|
|
rule_arn = new_config_rule.pop("ConfigRuleArn")
|
|
rule_id = new_config_rule.pop("ConfigRuleId")
|
|
rsp_json = json.dumps(new_config_rule, sort_keys=True)
|
|
assert custom_rule_json == rsp_json
|
|
|
|
# Update custom rule and compare again.
|
|
custom_rule["ConfigRuleArn"] = rule_arn
|
|
custom_rule["ConfigRuleId"] = rule_id
|
|
custom_rule["Description"] = "Updated Managed S3 Public Read Rule"
|
|
custom_rule["Scope"]["ComplianceResourceTypes"] = ["AWS::S3::Bucket"]
|
|
custom_rule["Scope"]["ComplianceResourceId"] = "S3-BUCKET_VERSIONING_ENABLED"
|
|
custom_rule["Source"]["SourceDetails"][0] = {
|
|
"EventSource": "aws.config",
|
|
"MessageType": "ConfigurationItemChangeNotification",
|
|
}
|
|
custom_rule["InputParameters"] = "{}"
|
|
client.put_config_rule(ConfigRule=custom_rule)
|
|
|
|
rsp = client.describe_config_rules(ConfigRuleNames=[custom_rule["ConfigRuleName"]])
|
|
custom_rule_json = json.dumps(custom_rule, sort_keys=True)
|
|
rsp_json = json.dumps(rsp["ConfigRules"][0], sort_keys=True)
|
|
assert custom_rule_json == rsp_json
|
|
|
|
# Update a custom rule specifying just the rule Id. Test the default
|
|
# value for MaximumExecutionFrequency while we're at it.
|
|
del custom_rule["ConfigRuleArn"]
|
|
rule_name = custom_rule.pop("ConfigRuleName")
|
|
custom_rule["Source"]["SourceDetails"][0] = {
|
|
"EventSource": "aws.config",
|
|
"MessageType": "ConfigurationSnapshotDeliveryCompleted",
|
|
}
|
|
client.put_config_rule(ConfigRule=custom_rule)
|
|
rsp = client.describe_config_rules(ConfigRuleNames=[rule_name])
|
|
updated_rule = rsp["ConfigRules"][0]
|
|
assert updated_rule["ConfigRuleName"] == rule_name
|
|
assert (
|
|
updated_rule["Source"]["SourceDetails"][0]["MaximumExecutionFrequency"]
|
|
== "TwentyFour_Hours"
|
|
)
|
|
|
|
# Update a custom rule specifying just the rule ARN.
|
|
custom_rule["ConfigRuleArn"] = rule_arn
|
|
del custom_rule["ConfigRuleId"]
|
|
custom_rule["MaximumExecutionFrequency"] = "Six_Hours"
|
|
client.put_config_rule(ConfigRule=custom_rule)
|
|
rsp = client.describe_config_rules(ConfigRuleNames=[rule_name])
|
|
updated_rule = rsp["ConfigRules"][0]
|
|
assert updated_rule["ConfigRuleName"] == rule_name
|
|
assert updated_rule["MaximumExecutionFrequency"] == "Six_Hours"
|
|
|
|
|
|
@mock_config
|
|
def test_config_rules_source_errors(): # pylint: disable=too-many-statements
|
|
"""Test various error conditions in ConfigRule.Source instantiation."""
|
|
client = boto3.client("config", region_name=TEST_REGION)
|
|
|
|
# Missing fields (ParamValidationError) caught by botocore and not
|
|
# tested here: ConfigRule.Source.SourceIdentifier
|
|
|
|
managed_rule = managed_config_rule()
|
|
managed_rule["Source"]["Owner"] = "test"
|
|
with pytest.raises(ClientError) as exc:
|
|
client.put_config_rule(ConfigRule=managed_rule)
|
|
err = exc.value.response["Error"]
|
|
assert err["Code"] == "ValidationException"
|
|
assert "Member must satisfy enum value set: {AWS, CUSTOM_LAMBDA}" in err["Message"]
|
|
|
|
managed_rule = managed_config_rule()
|
|
managed_rule["Source"]["SourceIdentifier"] = "test"
|
|
with pytest.raises(ClientError) as exc:
|
|
client.put_config_rule(ConfigRule=managed_rule)
|
|
err = exc.value.response["Error"]
|
|
assert err["Code"] == "InvalidParameterValueException"
|
|
assert (
|
|
"The sourceIdentifier test is invalid. Please refer to the "
|
|
"documentation for a list of valid sourceIdentifiers that can be used "
|
|
"when AWS is the Owner" in err["Message"]
|
|
)
|
|
|
|
managed_rule = managed_config_rule()
|
|
managed_rule["Source"]["SourceDetails"] = [
|
|
{"EventSource": "aws.config", "MessageType": "ScheduledNotification"}
|
|
]
|
|
with pytest.raises(ClientError) as exc:
|
|
client.put_config_rule(ConfigRule=managed_rule)
|
|
err = exc.value.response["Error"]
|
|
assert err["Code"] == "InvalidParameterValueException"
|
|
assert (
|
|
"SourceDetails should be null/empty if the owner is AWS. "
|
|
"SourceDetails should be provided if the owner is CUSTOM_LAMBDA"
|
|
in err["Message"]
|
|
)
|
|
|
|
custom_rule = custom_config_rule()
|
|
custom_rule["Source"] = {
|
|
"Owner": "CUSTOM_LAMBDA",
|
|
"SourceIdentifier": "test",
|
|
}
|
|
with pytest.raises(ClientError) as exc:
|
|
client.put_config_rule(ConfigRule=custom_rule)
|
|
err = exc.value.response["Error"]
|
|
assert err["Code"] == "InvalidParameterValueException"
|
|
assert (
|
|
"SourceDetails should be null/empty if the owner is AWS. "
|
|
"SourceDetails should be provided if the owner is CUSTOM_LAMBDA"
|
|
in err["Message"]
|
|
)
|
|
|
|
custom_rule = custom_config_rule(func_name="unknown_func")
|
|
with pytest.raises(ClientError) as exc:
|
|
client.put_config_rule(ConfigRule=custom_rule)
|
|
err = exc.value.response["Error"]
|
|
assert err["Code"] == "InsufficientPermissionsException"
|
|
assert (
|
|
f'The AWS Lambda function {custom_rule["Source"]["SourceIdentifier"]} '
|
|
f"cannot be invoked. Check the specified function ARN, and check the "
|
|
f"function's permissions" in err["Message"]
|
|
)
|