AWS Config support for Resource Type Exclusions (#6428)

This commit is contained in:
Mike Grima 2023-06-21 07:41:00 -04:00 committed by GitHub
parent c942883183
commit af4a8e2ba6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 227 additions and 23 deletions

View File

@ -291,12 +291,16 @@ class RecordingGroup(ConfigEmptyDictable):
all_supported: bool = True,
include_global_resource_types: bool = False,
resource_types: Optional[List[str]] = None,
exclusion_by_resource_types: Optional[Dict[str, List[str]]] = None,
recording_strategy: Optional[Dict[str, str]] = None,
):
super().__init__()
self.all_supported = all_supported
self.include_global_resource_types = include_global_resource_types
self.resource_types = resource_types
self.exclusion_by_resource_types = exclusion_by_resource_types
self.recording_strategy = recording_strategy
class ConfigRecorder(ConfigEmptyDictable):
@ -1189,7 +1193,13 @@ class ConfigBackend(BaseBackend):
# Validate the Recording Group:
if config_recorder.get("recordingGroup") is None:
recording_group = RecordingGroup()
recording_group = RecordingGroup(
all_supported=True,
include_global_resource_types=False,
resource_types=[],
exclusion_by_resource_types={"resourceTypes": []},
recording_strategy={"useOnly": "ALL_SUPPORTED_RESOURCE_TYPES"},
)
else:
rgroup = config_recorder["recordingGroup"]
@ -1197,27 +1207,88 @@ class ConfigBackend(BaseBackend):
if not rgroup:
raise InvalidRecordingGroupException()
# Can't have both the resource types specified and the other flags as True.
if rgroup.get("resourceTypes") and (
rgroup.get("allSupported", False)
or rgroup.get("includeGlobalResourceTypes", False)
):
raise InvalidRecordingGroupException()
# Must supply resourceTypes if 'allSupported' is not supplied:
if not rgroup.get("allSupported") and not rgroup.get("resourceTypes"):
raise InvalidRecordingGroupException()
# Validate that the list provided is correct:
self._validate_resource_types(rgroup.get("resourceTypes", []))
recording_group = RecordingGroup(
all_supported=rgroup.get("allSupported", True),
include_global_resource_types=rgroup.get(
"includeGlobalResourceTypes", False
),
resource_types=rgroup.get("resourceTypes", []),
# Recording strategy must be one of the allowed enums:
recording_strategy = rgroup.get("recordingStrategy", {}).get(
"useOnly", None
)
if recording_strategy not in {
None,
"ALL_SUPPORTED_RESOURCE_TYPES",
"INCLUSION_BY_RESOURCE_TYPES",
"EXCLUSION_BY_RESOURCE_TYPES",
}:
raise ValidationException(
f"1 validation error detected: Value '{recording_strategy}' at 'configurationRecorder.recordingGroup.recordingStrategy.useOnly' failed to satisfy constraint:"
f" Member must satisfy enum value set: [INCLUSION_BY_RESOURCE_TYPES, ALL_SUPPORTED_RESOURCE_TYPES, EXCLUSION_BY_RESOURCE_TYPES]"
)
# Validate the allSupported:
if rgroup.get("allSupported", False):
if (
rgroup.get("resourceTypes", [])
or rgroup.get("exclusionByResourceTypes", {})
or recording_strategy not in {None, "ALL_SUPPORTED_RESOURCE_TYPES"}
):
raise InvalidRecordingGroupException()
recording_group = RecordingGroup(
all_supported=True,
include_global_resource_types=rgroup.get(
"includeGlobalResourceTypes", False
),
resource_types=[],
exclusion_by_resource_types={"resourceTypes": []},
recording_strategy={"useOnly": "ALL_SUPPORTED_RESOURCE_TYPES"},
)
# Validate the specifically passed in resource types:
elif rgroup.get("resourceTypes", []):
if (
rgroup.get("includeGlobalResourceTypes", False)
or rgroup.get("exclusionByResourceTypes", {})
or recording_strategy not in {None, "INCLUSION_BY_RESOURCE_TYPES"}
):
raise InvalidRecordingGroupException()
# Validate that the resource list provided is correct:
self._validate_resource_types(rgroup["resourceTypes"])
recording_group = RecordingGroup(
all_supported=False,
include_global_resource_types=False,
resource_types=rgroup["resourceTypes"],
exclusion_by_resource_types={"resourceTypes": []},
recording_strategy={"useOnly": "INCLUSION_BY_RESOURCE_TYPES"},
)
# Validate the excluded resource types:
elif rgroup.get("exclusionByResourceTypes", {}):
if not rgroup["exclusionByResourceTypes"].get("resourceTypes", []):
raise InvalidRecordingGroupException()
# The recording strategy must be provided for exclusions.
if (
rgroup.get("includeGlobalResourceTypes", False)
or recording_strategy != "EXCLUSION_BY_RESOURCE_TYPES"
):
raise InvalidRecordingGroupException()
# Validate that the resource list provided is correct:
self._validate_resource_types(
rgroup["exclusionByResourceTypes"]["resourceTypes"]
)
recording_group = RecordingGroup(
all_supported=False,
include_global_resource_types=False,
resource_types=[],
exclusion_by_resource_types=rgroup["exclusionByResourceTypes"],
recording_strategy={"useOnly": "EXCLUSION_BY_RESOURCE_TYPES"},
)
# If the resourceTypes is an empty list, this will be reached:
else:
raise InvalidRecordingGroupException()
self.recorders[config_recorder["name"]] = ConfigRecorder(
config_recorder["roleARN"],

View File

@ -40,31 +40,65 @@ def test_put_configuration_recorder():
in ce.value.response["Error"]["Message"]
)
# With resource types and flags set to True:
# With a combination of bad things:
bad_groups = [
{
"allSupported": True,
"includeGlobalResourceTypes": True,
"resourceTypes": ["item"],
},
{
"allSupported": True,
"includeGlobalResourceTypes": True,
"resourceTypes": ["item"],
"exclusionByResourceTypes": {"resourceTypes": ["item"]},
},
{
"allSupported": True,
"includeGlobalResourceTypes": True,
"exclusionByResourceTypes": {"resourceTypes": ["item"]},
},
{
"allSupported": True,
"includeGlobalResourceTypes": True,
"recordingStrategy": {"useOnly": "EXCLUSION_BY_RESOURCE_TYPES"},
},
{
"allSupported": False,
"includeGlobalResourceTypes": True,
"resourceTypes": ["item"],
},
{
"resourceTypes": ["item"],
"exclusionByResourceTypes": {"resourceTypes": ["item"]},
},
{
"resourceTypes": ["item"],
"recordingStrategy": {"useOnly": "EXCLUSION_BY_RESOURCE_TYPES"},
},
{
"allSupported": True,
"includeGlobalResourceTypes": False,
"resourceTypes": ["item"],
},
{
"allSupported": True,
"includeGlobalResourceTypes": False,
"exclusionByResourceTypes": {"resourceTypes": ["item"]},
},
{
"allSupported": False,
"includeGlobalResourceTypes": False,
"resourceTypes": [],
},
{
"exclusionByResourceTypes": {"resourceTypes": []},
"recordingStrategy": {"useOnly": "EXCLUSION_BY_RESOURCE_TYPES"},
},
{"includeGlobalResourceTypes": False, "resourceTypes": []},
{"includeGlobalResourceTypes": True},
{"resourceTypes": []},
{"exclusionByResourceTypes": {"resourceTypes": ["AWS::EC2::Instance"]}},
{},
]
@ -108,6 +142,54 @@ def test_put_configuration_recorder():
)
assert "AWS::EC2::Instance" in ce.value.response["Error"]["Message"]
# ... and again with exclusions:
with pytest.raises(ClientError) as ce:
client.put_configuration_recorder(
ConfigurationRecorder={
"name": "default",
"roleARN": "somearn",
"recordingGroup": {
"allSupported": False,
"includeGlobalResourceTypes": False,
# 2 good, and 2 bad:
"exclusionByResourceTypes": {
"resourceTypes": [
"AWS::EC2::Volume",
"LOLNO",
"AWS::EC2::VPC",
"LOLSTILLNO",
]
},
"recordingStrategy": {"useOnly": "EXCLUSION_BY_RESOURCE_TYPES"},
},
}
)
assert ce.value.response["Error"]["Code"] == "ValidationException"
assert "2 validation error detected: Value '['LOLNO', 'LOLSTILLNO']" in str(
ce.value.response["Error"]["Message"]
)
assert "AWS::EC2::Instance" in ce.value.response["Error"]["Message"]
# With an invalid recording strategy:
with pytest.raises(ClientError) as ce:
client.put_configuration_recorder(
ConfigurationRecorder={
"name": "default",
"roleARN": "somearn",
"recordingGroup": {
"allSupported": True,
"includeGlobalResourceTypes": False,
"recordingStrategy": {"useOnly": "LOLWUT"},
},
}
)
assert ce.value.response["Error"]["Code"] == "ValidationException"
assert (
"1 validation error detected: Value 'LOLWUT' at 'configurationRecorder.recordingGroup.recordingStrategy.useOnly' failed to satisfy "
"constraint: Member must satisfy enum value set: [INCLUSION_BY_RESOURCE_TYPES, ALL_SUPPORTED_RESOURCE_TYPES, EXCLUSION_BY_RESOURCE_TYPES]"
in str(ce.value.response["Error"]["Message"])
)
# Create a proper one:
client.put_configuration_recorder(
ConfigurationRecorder={
@ -132,6 +214,9 @@ def test_put_configuration_recorder():
"AWS::EC2::Volume" in result[0]["recordingGroup"]["resourceTypes"]
and "AWS::EC2::VPC" in result[0]["recordingGroup"]["resourceTypes"]
)
assert result[0]["recordingGroup"]["recordingStrategy"] == {
"useOnly": "INCLUSION_BY_RESOURCE_TYPES"
}
# Now update the configuration recorder:
client.put_configuration_recorder(
@ -151,6 +236,50 @@ def test_put_configuration_recorder():
assert result[0]["recordingGroup"]["allSupported"]
assert result[0]["recordingGroup"]["includeGlobalResourceTypes"]
assert len(result[0]["recordingGroup"]["resourceTypes"]) == 0
assert result[0]["recordingGroup"]["recordingStrategy"] == {
"useOnly": "ALL_SUPPORTED_RESOURCE_TYPES"
}
# Verify that it works with the strategy passed in:
client.put_configuration_recorder(
ConfigurationRecorder={
"name": "testrecorder",
"roleARN": "somearn",
"recordingGroup": {
"allSupported": True,
"includeGlobalResourceTypes": True,
"recordingStrategy": {"useOnly": "ALL_SUPPORTED_RESOURCE_TYPES"},
},
}
)
assert client.describe_configuration_recorders()["ConfigurationRecorders"][0][
"recordingGroup"
]["allSupported"]
# Update again for exclusions:
client.put_configuration_recorder(
ConfigurationRecorder={
"name": "testrecorder",
"roleARN": "somearn",
"recordingGroup": {
"exclusionByResourceTypes": {"resourceTypes": ["AWS::EC2::Instance"]},
"recordingStrategy": {"useOnly": "EXCLUSION_BY_RESOURCE_TYPES"},
},
}
)
result = client.describe_configuration_recorders()["ConfigurationRecorders"]
assert len(result) == 1
assert result[0]["name"] == "testrecorder"
assert result[0]["roleARN"] == "somearn"
assert not result[0]["recordingGroup"]["allSupported"]
assert not result[0]["recordingGroup"]["includeGlobalResourceTypes"]
assert not result[0]["recordingGroup"]["resourceTypes"]
assert result[0]["recordingGroup"]["exclusionByResourceTypes"]["resourceTypes"] == [
"AWS::EC2::Instance"
]
assert result[0]["recordingGroup"]["recordingStrategy"] == {
"useOnly": "EXCLUSION_BY_RESOURCE_TYPES"
}
# With a default recording group (i.e. lacking one)
client.put_configuration_recorder(
@ -162,7 +291,11 @@ def test_put_configuration_recorder():
assert result[0]["roleARN"] == "somearn"
assert result[0]["recordingGroup"]["allSupported"]
assert not result[0]["recordingGroup"]["includeGlobalResourceTypes"]
assert not result[0]["recordingGroup"].get("resourceTypes")
assert not result[0]["recordingGroup"]["resourceTypes"]
assert not result[0]["recordingGroup"]["exclusionByResourceTypes"]["resourceTypes"]
assert result[0]["recordingGroup"]["recordingStrategy"] == {
"useOnly": "ALL_SUPPORTED_RESOURCE_TYPES"
}
# Can currently only have exactly 1 Config Recorder in an account/region:
with pytest.raises(ClientError) as ce: