295 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			295 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
from .test_config_rules import managed_config_rule, TEST_REGION
 | 
						|
from botocore.exceptions import ClientError
 | 
						|
from moto import mock_config, mock_iam, mock_lambda
 | 
						|
from moto.core import ACCOUNT_ID
 | 
						|
from io import BytesIO
 | 
						|
from uuid import uuid4
 | 
						|
from zipfile import ZipFile, ZIP_DEFLATED
 | 
						|
 | 
						|
import boto3
 | 
						|
import json
 | 
						|
import pytest
 | 
						|
 | 
						|
 | 
						|
def custom_config_rule(func_name="test_config_rule"):
 | 
						|
    """Return a valid custom AWS Config Rule."""
 | 
						|
    return {
 | 
						|
        "ConfigRuleName": f"custom_rule_{uuid4()}[:6]",
 | 
						|
        "Description": "Custom S3 Public Read Prohibited Bucket Rule",
 | 
						|
        "Scope": {"ComplianceResourceTypes": ["AWS::S3::Bucket", "AWS::IAM::Group"]},
 | 
						|
        "Source": {
 | 
						|
            "Owner": "CUSTOM_LAMBDA",
 | 
						|
            "SourceIdentifier": f"arn:aws:lambda:{TEST_REGION}:{ACCOUNT_ID}:function:{func_name}",
 | 
						|
            "SourceDetails": [
 | 
						|
                {
 | 
						|
                    "EventSource": "aws.config",
 | 
						|
                    "MessageType": "ScheduledNotification",
 | 
						|
                    "MaximumExecutionFrequency": "Three_Hours",
 | 
						|
                },
 | 
						|
            ],
 | 
						|
        },
 | 
						|
        "MaximumExecutionFrequency": "One_Hour",
 | 
						|
    }
 | 
						|
 | 
						|
 | 
						|
def zipped_lambda_function():
 | 
						|
    """Return a simple test lambda function, zipped."""
 | 
						|
    func_str = """
 | 
						|
def lambda_handler(event, context):
 | 
						|
    print("testing")
 | 
						|
    return event
 | 
						|
"""
 | 
						|
    zip_output = BytesIO()
 | 
						|
    with ZipFile(zip_output, "w", ZIP_DEFLATED) as zip_file:
 | 
						|
        zip_file.writestr("lambda_function.py", func_str)
 | 
						|
        zip_file.close()
 | 
						|
        zip_output.seek(0)
 | 
						|
        return zip_output.read()
 | 
						|
 | 
						|
 | 
						|
@mock_lambda
 | 
						|
def create_lambda_for_config_rule():
 | 
						|
    """Return the ARN of a lambda that can be used by a custom rule."""
 | 
						|
    role_name = "test-role"
 | 
						|
    lambda_role = None
 | 
						|
    with mock_iam():
 | 
						|
        iam_client = boto3.client("iam", region_name=TEST_REGION)
 | 
						|
        try:
 | 
						|
            lambda_role = iam_client.get_role(RoleName=role_name)["Role"]["Arn"]
 | 
						|
        except ClientError:
 | 
						|
            lambda_role = iam_client.create_role(
 | 
						|
                RoleName=role_name, AssumeRolePolicyDocument="test policy", Path="/"
 | 
						|
            )["Role"]["Arn"]
 | 
						|
 | 
						|
    # Create the lambda function and identify its location.
 | 
						|
    lambda_client = boto3.client("lambda", region_name=TEST_REGION)
 | 
						|
    lambda_client.create_function(
 | 
						|
        FunctionName="test_config_rule",
 | 
						|
        Runtime="python3.8",
 | 
						|
        Role=lambda_role,
 | 
						|
        Handler="lambda_function.lambda_handler",
 | 
						|
        Code={"ZipFile": zipped_lambda_function()},
 | 
						|
        Description="Lambda test function for config rule",
 | 
						|
        Timeout=3,
 | 
						|
        MemorySize=128,
 | 
						|
        Publish=True,
 | 
						|
    )
 | 
						|
 | 
						|
 | 
						|
@mock_config
 | 
						|
def test_config_rules_source_details_errors():
 | 
						|
    """Test error conditions with ConfigRule.Source_Details instantiation."""
 | 
						|
    client = boto3.client("config", region_name=TEST_REGION)
 | 
						|
 | 
						|
    create_lambda_for_config_rule()
 | 
						|
 | 
						|
    custom_rule = custom_config_rule()
 | 
						|
    custom_rule["Source"]["SourceDetails"][0] = {"MessageType": "ScheduledNotification"}
 | 
						|
    with pytest.raises(ClientError) as exc:
 | 
						|
        client.put_config_rule(ConfigRule=custom_rule)
 | 
						|
    err = exc.value.response["Error"]
 | 
						|
    assert err["Code"] == "ParamValidationError"
 | 
						|
    assert (
 | 
						|
        "Missing required parameter in ConfigRule.SourceDetails: 'EventSource'"
 | 
						|
        in err["Message"]
 | 
						|
    )
 | 
						|
 | 
						|
    custom_rule = custom_config_rule()
 | 
						|
    custom_rule["Source"]["SourceDetails"][0]["EventSource"] = "foo"
 | 
						|
    with pytest.raises(ClientError) as exc:
 | 
						|
        client.put_config_rule(ConfigRule=custom_rule)
 | 
						|
    err = exc.value.response["Error"]
 | 
						|
    assert err["Code"] == "ValidationException"
 | 
						|
    assert "Member must satisfy enum value set: {aws.config}" in err["Message"]
 | 
						|
 | 
						|
    custom_rule = custom_config_rule()
 | 
						|
    custom_rule["Source"]["SourceDetails"][0] = {"EventSource": "aws.config"}
 | 
						|
    with pytest.raises(ClientError) as exc:
 | 
						|
        client.put_config_rule(ConfigRule=custom_rule)
 | 
						|
    err = exc.value.response["Error"]
 | 
						|
    assert err["Code"] == "ParamValidationError"
 | 
						|
    assert (
 | 
						|
        "Missing required parameter in ConfigRule.SourceDetails: 'MessageType'"
 | 
						|
        in err["Message"]
 | 
						|
    )
 | 
						|
 | 
						|
    custom_rule = custom_config_rule()
 | 
						|
    custom_rule["Source"]["SourceDetails"][0] = {
 | 
						|
        "MessageType": "foo",
 | 
						|
        "EventSource": "aws.config",
 | 
						|
    }
 | 
						|
    with pytest.raises(ClientError) as exc:
 | 
						|
        client.put_config_rule(ConfigRule=custom_rule)
 | 
						|
    err = exc.value.response["Error"]
 | 
						|
    assert err["Code"] == "ValidationException"
 | 
						|
    assert (
 | 
						|
        "Member must satisfy enum value set: "
 | 
						|
        "{ConfigurationItemChangeNotification, "
 | 
						|
        "ConfigurationSnapshotDeliveryCompleted, "
 | 
						|
        "OversizedConfigurationItemChangeNotification, ScheduledNotification}"
 | 
						|
        in err["Message"]
 | 
						|
    )
 | 
						|
 | 
						|
    custom_rule = custom_config_rule()
 | 
						|
    custom_rule["Source"]["SourceDetails"][0]["MaximumExecutionFrequency"] = "foo"
 | 
						|
    with pytest.raises(ClientError) as exc:
 | 
						|
        client.put_config_rule(ConfigRule=custom_rule)
 | 
						|
    err = exc.value.response["Error"]
 | 
						|
    assert err["Code"] == "ValidationException"
 | 
						|
    assert (
 | 
						|
        "Member must satisfy enum value set: "
 | 
						|
        "{One_Hour, Six_Hours, Three_Hours, Twelve_Hours, TwentyFour_Hours}"
 | 
						|
        in err["Message"]
 | 
						|
    )
 | 
						|
 | 
						|
    custom_rule = custom_config_rule()
 | 
						|
    custom_rule["Source"]["SourceDetails"][0][
 | 
						|
        "MessageType"
 | 
						|
    ] = "ConfigurationItemChangeNotification"
 | 
						|
    with pytest.raises(ClientError) as exc:
 | 
						|
        client.put_config_rule(ConfigRule=custom_rule)
 | 
						|
    err = exc.value.response["Error"]
 | 
						|
    assert err["Code"] == "InvalidParameterValueException"
 | 
						|
    assert (
 | 
						|
        "A maximum execution frequency is not allowed if MessageType "
 | 
						|
        "is ConfigurationItemChangeNotification or "
 | 
						|
        "OversizedConfigurationItemChangeNotification" in err["Message"]
 | 
						|
    )
 | 
						|
 | 
						|
 | 
						|
@mock_config
 | 
						|
def test_valid_put_config_custom_rule():
 | 
						|
    """Test valid put_config_rule API calls for custom rules."""
 | 
						|
    client = boto3.client("config", region_name=TEST_REGION)
 | 
						|
    # Create custom rule and compare input against describe_config_rule
 | 
						|
    # output.
 | 
						|
    create_lambda_for_config_rule()
 | 
						|
    custom_rule = custom_config_rule()
 | 
						|
    custom_rule["Scope"]["ComplianceResourceTypes"] = ["AWS::IAM::Group"]
 | 
						|
    custom_rule["Scope"]["ComplianceResourceId"] = "basic_custom_test"
 | 
						|
    custom_rule["InputParameters"] = '{"TestName":"true"}'
 | 
						|
    custom_rule["ConfigRuleState"] = "ACTIVE"
 | 
						|
    client.put_config_rule(ConfigRule=custom_rule)
 | 
						|
 | 
						|
    rsp = client.describe_config_rules(ConfigRuleNames=[custom_rule["ConfigRuleName"]])
 | 
						|
    custom_rule_json = json.dumps(custom_rule, sort_keys=True)
 | 
						|
    new_config_rule = rsp["ConfigRules"][0]
 | 
						|
    rule_arn = new_config_rule.pop("ConfigRuleArn")
 | 
						|
    rule_id = new_config_rule.pop("ConfigRuleId")
 | 
						|
    rsp_json = json.dumps(new_config_rule, sort_keys=True)
 | 
						|
    assert custom_rule_json == rsp_json
 | 
						|
 | 
						|
    # Update custom rule and compare again.
 | 
						|
    custom_rule["ConfigRuleArn"] = rule_arn
 | 
						|
    custom_rule["ConfigRuleId"] = rule_id
 | 
						|
    custom_rule["Description"] = "Updated Managed S3 Public Read Rule"
 | 
						|
    custom_rule["Scope"]["ComplianceResourceTypes"] = ["AWS::S3::Bucket"]
 | 
						|
    custom_rule["Scope"]["ComplianceResourceId"] = "S3-BUCKET_VERSIONING_ENABLED"
 | 
						|
    custom_rule["Source"]["SourceDetails"][0] = {
 | 
						|
        "EventSource": "aws.config",
 | 
						|
        "MessageType": "ConfigurationItemChangeNotification",
 | 
						|
    }
 | 
						|
    custom_rule["InputParameters"] = "{}"
 | 
						|
    client.put_config_rule(ConfigRule=custom_rule)
 | 
						|
 | 
						|
    rsp = client.describe_config_rules(ConfigRuleNames=[custom_rule["ConfigRuleName"]])
 | 
						|
    custom_rule_json = json.dumps(custom_rule, sort_keys=True)
 | 
						|
    rsp_json = json.dumps(rsp["ConfigRules"][0], sort_keys=True)
 | 
						|
    assert custom_rule_json == rsp_json
 | 
						|
 | 
						|
    # Update a custom rule specifying just the rule Id.  Test the default
 | 
						|
    # value for MaximumExecutionFrequency while we're at it.
 | 
						|
    del custom_rule["ConfigRuleArn"]
 | 
						|
    rule_name = custom_rule.pop("ConfigRuleName")
 | 
						|
    custom_rule["Source"]["SourceDetails"][0] = {
 | 
						|
        "EventSource": "aws.config",
 | 
						|
        "MessageType": "ConfigurationSnapshotDeliveryCompleted",
 | 
						|
    }
 | 
						|
    client.put_config_rule(ConfigRule=custom_rule)
 | 
						|
    rsp = client.describe_config_rules(ConfigRuleNames=[rule_name])
 | 
						|
    updated_rule = rsp["ConfigRules"][0]
 | 
						|
    assert updated_rule["ConfigRuleName"] == rule_name
 | 
						|
    assert (
 | 
						|
        updated_rule["Source"]["SourceDetails"][0]["MaximumExecutionFrequency"]
 | 
						|
        == "TwentyFour_Hours"
 | 
						|
    )
 | 
						|
 | 
						|
    # Update a custom rule specifying just the rule ARN.
 | 
						|
    custom_rule["ConfigRuleArn"] = rule_arn
 | 
						|
    del custom_rule["ConfigRuleId"]
 | 
						|
    custom_rule["MaximumExecutionFrequency"] = "Six_Hours"
 | 
						|
    client.put_config_rule(ConfigRule=custom_rule)
 | 
						|
    rsp = client.describe_config_rules(ConfigRuleNames=[rule_name])
 | 
						|
    updated_rule = rsp["ConfigRules"][0]
 | 
						|
    assert updated_rule["ConfigRuleName"] == rule_name
 | 
						|
    assert updated_rule["MaximumExecutionFrequency"] == "Six_Hours"
 | 
						|
 | 
						|
 | 
						|
@mock_config
 | 
						|
def test_config_rules_source_errors():  # pylint: disable=too-many-statements
 | 
						|
    """Test various error conditions in ConfigRule.Source instantiation."""
 | 
						|
    client = boto3.client("config", region_name=TEST_REGION)
 | 
						|
 | 
						|
    # Missing fields (ParamValidationError) caught by botocore and not
 | 
						|
    # tested here:  ConfigRule.Source.SourceIdentifier
 | 
						|
 | 
						|
    managed_rule = managed_config_rule()
 | 
						|
    managed_rule["Source"]["Owner"] = "test"
 | 
						|
    with pytest.raises(ClientError) as exc:
 | 
						|
        client.put_config_rule(ConfigRule=managed_rule)
 | 
						|
    err = exc.value.response["Error"]
 | 
						|
    assert err["Code"] == "ValidationException"
 | 
						|
    assert "Member must satisfy enum value set: {AWS, CUSTOM_LAMBDA}" in err["Message"]
 | 
						|
 | 
						|
    managed_rule = managed_config_rule()
 | 
						|
    managed_rule["Source"]["SourceIdentifier"] = "test"
 | 
						|
    with pytest.raises(ClientError) as exc:
 | 
						|
        client.put_config_rule(ConfigRule=managed_rule)
 | 
						|
    err = exc.value.response["Error"]
 | 
						|
    assert err["Code"] == "InvalidParameterValueException"
 | 
						|
    assert (
 | 
						|
        "The sourceIdentifier test is invalid.  Please refer to the "
 | 
						|
        "documentation for a list of valid sourceIdentifiers that can be used "
 | 
						|
        "when AWS is the Owner" in err["Message"]
 | 
						|
    )
 | 
						|
 | 
						|
    managed_rule = managed_config_rule()
 | 
						|
    managed_rule["Source"]["SourceDetails"] = [
 | 
						|
        {"EventSource": "aws.config", "MessageType": "ScheduledNotification"}
 | 
						|
    ]
 | 
						|
    with pytest.raises(ClientError) as exc:
 | 
						|
        client.put_config_rule(ConfigRule=managed_rule)
 | 
						|
    err = exc.value.response["Error"]
 | 
						|
    assert err["Code"] == "InvalidParameterValueException"
 | 
						|
    assert (
 | 
						|
        "SourceDetails should be null/empty if the owner is AWS. "
 | 
						|
        "SourceDetails should be provided if the owner is CUSTOM_LAMBDA"
 | 
						|
        in err["Message"]
 | 
						|
    )
 | 
						|
 | 
						|
    custom_rule = custom_config_rule()
 | 
						|
    custom_rule["Source"] = {
 | 
						|
        "Owner": "CUSTOM_LAMBDA",
 | 
						|
        "SourceIdentifier": "test",
 | 
						|
    }
 | 
						|
    with pytest.raises(ClientError) as exc:
 | 
						|
        client.put_config_rule(ConfigRule=custom_rule)
 | 
						|
    err = exc.value.response["Error"]
 | 
						|
    assert err["Code"] == "InvalidParameterValueException"
 | 
						|
    assert (
 | 
						|
        "SourceDetails should be null/empty if the owner is AWS. "
 | 
						|
        "SourceDetails should be provided if the owner is CUSTOM_LAMBDA"
 | 
						|
        in err["Message"]
 | 
						|
    )
 | 
						|
 | 
						|
    custom_rule = custom_config_rule(func_name="unknown_func")
 | 
						|
    with pytest.raises(ClientError) as exc:
 | 
						|
        client.put_config_rule(ConfigRule=custom_rule)
 | 
						|
    err = exc.value.response["Error"]
 | 
						|
    assert err["Code"] == "InsufficientPermissionsException"
 | 
						|
    assert (
 | 
						|
        f'The AWS Lambda function {custom_rule["Source"]["SourceIdentifier"]} '
 | 
						|
        f"cannot be invoked. Check the specified function ARN, and check the "
 | 
						|
        f"function's permissions" in err["Message"]
 | 
						|
    )
 |