diff --git a/moto/config/models.py b/moto/config/models.py index fbda49844..fbed2ab2a 100644 --- a/moto/config/models.py +++ b/moto/config/models.py @@ -291,12 +291,16 @@ class RecordingGroup(ConfigEmptyDictable): all_supported: bool = True, include_global_resource_types: bool = False, resource_types: Optional[List[str]] = None, + exclusion_by_resource_types: Optional[Dict[str, List[str]]] = None, + recording_strategy: Optional[Dict[str, str]] = None, ): super().__init__() self.all_supported = all_supported self.include_global_resource_types = include_global_resource_types self.resource_types = resource_types + self.exclusion_by_resource_types = exclusion_by_resource_types + self.recording_strategy = recording_strategy class ConfigRecorder(ConfigEmptyDictable): @@ -1189,7 +1193,13 @@ class ConfigBackend(BaseBackend): # Validate the Recording Group: if config_recorder.get("recordingGroup") is None: - recording_group = RecordingGroup() + recording_group = RecordingGroup( + all_supported=True, + include_global_resource_types=False, + resource_types=[], + exclusion_by_resource_types={"resourceTypes": []}, + recording_strategy={"useOnly": "ALL_SUPPORTED_RESOURCE_TYPES"}, + ) else: rgroup = config_recorder["recordingGroup"] @@ -1197,27 +1207,88 @@ class ConfigBackend(BaseBackend): if not rgroup: raise InvalidRecordingGroupException() - # Can't have both the resource types specified and the other flags as True. - if rgroup.get("resourceTypes") and ( - rgroup.get("allSupported", False) - or rgroup.get("includeGlobalResourceTypes", False) - ): - raise InvalidRecordingGroupException() - - # Must supply resourceTypes if 'allSupported' is not supplied: - if not rgroup.get("allSupported") and not rgroup.get("resourceTypes"): - raise InvalidRecordingGroupException() - - # Validate that the list provided is correct: - self._validate_resource_types(rgroup.get("resourceTypes", [])) - - recording_group = RecordingGroup( - all_supported=rgroup.get("allSupported", True), - include_global_resource_types=rgroup.get( - "includeGlobalResourceTypes", False - ), - resource_types=rgroup.get("resourceTypes", []), + # Recording strategy must be one of the allowed enums: + recording_strategy = rgroup.get("recordingStrategy", {}).get( + "useOnly", None ) + if recording_strategy not in { + None, + "ALL_SUPPORTED_RESOURCE_TYPES", + "INCLUSION_BY_RESOURCE_TYPES", + "EXCLUSION_BY_RESOURCE_TYPES", + }: + raise ValidationException( + f"1 validation error detected: Value '{recording_strategy}' at 'configurationRecorder.recordingGroup.recordingStrategy.useOnly' failed to satisfy constraint:" + f" Member must satisfy enum value set: [INCLUSION_BY_RESOURCE_TYPES, ALL_SUPPORTED_RESOURCE_TYPES, EXCLUSION_BY_RESOURCE_TYPES]" + ) + + # Validate the allSupported: + if rgroup.get("allSupported", False): + if ( + rgroup.get("resourceTypes", []) + or rgroup.get("exclusionByResourceTypes", {}) + or recording_strategy not in {None, "ALL_SUPPORTED_RESOURCE_TYPES"} + ): + raise InvalidRecordingGroupException() + + recording_group = RecordingGroup( + all_supported=True, + include_global_resource_types=rgroup.get( + "includeGlobalResourceTypes", False + ), + resource_types=[], + exclusion_by_resource_types={"resourceTypes": []}, + recording_strategy={"useOnly": "ALL_SUPPORTED_RESOURCE_TYPES"}, + ) + + # Validate the specifically passed in resource types: + elif rgroup.get("resourceTypes", []): + if ( + rgroup.get("includeGlobalResourceTypes", False) + or rgroup.get("exclusionByResourceTypes", {}) + or recording_strategy not in {None, "INCLUSION_BY_RESOURCE_TYPES"} + ): + raise InvalidRecordingGroupException() + + # Validate that the resource list provided is correct: + self._validate_resource_types(rgroup["resourceTypes"]) + + recording_group = RecordingGroup( + all_supported=False, + include_global_resource_types=False, + resource_types=rgroup["resourceTypes"], + exclusion_by_resource_types={"resourceTypes": []}, + recording_strategy={"useOnly": "INCLUSION_BY_RESOURCE_TYPES"}, + ) + + # Validate the excluded resource types: + elif rgroup.get("exclusionByResourceTypes", {}): + if not rgroup["exclusionByResourceTypes"].get("resourceTypes", []): + raise InvalidRecordingGroupException() + + # The recording strategy must be provided for exclusions. + if ( + rgroup.get("includeGlobalResourceTypes", False) + or recording_strategy != "EXCLUSION_BY_RESOURCE_TYPES" + ): + raise InvalidRecordingGroupException() + + # Validate that the resource list provided is correct: + self._validate_resource_types( + rgroup["exclusionByResourceTypes"]["resourceTypes"] + ) + + recording_group = RecordingGroup( + all_supported=False, + include_global_resource_types=False, + resource_types=[], + exclusion_by_resource_types=rgroup["exclusionByResourceTypes"], + recording_strategy={"useOnly": "EXCLUSION_BY_RESOURCE_TYPES"}, + ) + + # If the resourceTypes is an empty list, this will be reached: + else: + raise InvalidRecordingGroupException() self.recorders[config_recorder["name"]] = ConfigRecorder( config_recorder["roleARN"], diff --git a/tests/test_config/test_config.py b/tests/test_config/test_config.py index 8baa10bb2..966881ff4 100644 --- a/tests/test_config/test_config.py +++ b/tests/test_config/test_config.py @@ -40,31 +40,65 @@ def test_put_configuration_recorder(): in ce.value.response["Error"]["Message"] ) - # With resource types and flags set to True: + # With a combination of bad things: bad_groups = [ { "allSupported": True, "includeGlobalResourceTypes": True, "resourceTypes": ["item"], }, + { + "allSupported": True, + "includeGlobalResourceTypes": True, + "resourceTypes": ["item"], + "exclusionByResourceTypes": {"resourceTypes": ["item"]}, + }, + { + "allSupported": True, + "includeGlobalResourceTypes": True, + "exclusionByResourceTypes": {"resourceTypes": ["item"]}, + }, + { + "allSupported": True, + "includeGlobalResourceTypes": True, + "recordingStrategy": {"useOnly": "EXCLUSION_BY_RESOURCE_TYPES"}, + }, { "allSupported": False, "includeGlobalResourceTypes": True, "resourceTypes": ["item"], }, + { + "resourceTypes": ["item"], + "exclusionByResourceTypes": {"resourceTypes": ["item"]}, + }, + { + "resourceTypes": ["item"], + "recordingStrategy": {"useOnly": "EXCLUSION_BY_RESOURCE_TYPES"}, + }, { "allSupported": True, "includeGlobalResourceTypes": False, "resourceTypes": ["item"], }, + { + "allSupported": True, + "includeGlobalResourceTypes": False, + "exclusionByResourceTypes": {"resourceTypes": ["item"]}, + }, { "allSupported": False, "includeGlobalResourceTypes": False, "resourceTypes": [], }, + { + "exclusionByResourceTypes": {"resourceTypes": []}, + "recordingStrategy": {"useOnly": "EXCLUSION_BY_RESOURCE_TYPES"}, + }, {"includeGlobalResourceTypes": False, "resourceTypes": []}, {"includeGlobalResourceTypes": True}, {"resourceTypes": []}, + {"exclusionByResourceTypes": {"resourceTypes": ["AWS::EC2::Instance"]}}, {}, ] @@ -108,6 +142,54 @@ def test_put_configuration_recorder(): ) assert "AWS::EC2::Instance" in ce.value.response["Error"]["Message"] + # ... and again with exclusions: + with pytest.raises(ClientError) as ce: + client.put_configuration_recorder( + ConfigurationRecorder={ + "name": "default", + "roleARN": "somearn", + "recordingGroup": { + "allSupported": False, + "includeGlobalResourceTypes": False, + # 2 good, and 2 bad: + "exclusionByResourceTypes": { + "resourceTypes": [ + "AWS::EC2::Volume", + "LOLNO", + "AWS::EC2::VPC", + "LOLSTILLNO", + ] + }, + "recordingStrategy": {"useOnly": "EXCLUSION_BY_RESOURCE_TYPES"}, + }, + } + ) + assert ce.value.response["Error"]["Code"] == "ValidationException" + assert "2 validation error detected: Value '['LOLNO', 'LOLSTILLNO']" in str( + ce.value.response["Error"]["Message"] + ) + assert "AWS::EC2::Instance" in ce.value.response["Error"]["Message"] + + # With an invalid recording strategy: + with pytest.raises(ClientError) as ce: + client.put_configuration_recorder( + ConfigurationRecorder={ + "name": "default", + "roleARN": "somearn", + "recordingGroup": { + "allSupported": True, + "includeGlobalResourceTypes": False, + "recordingStrategy": {"useOnly": "LOLWUT"}, + }, + } + ) + assert ce.value.response["Error"]["Code"] == "ValidationException" + assert ( + "1 validation error detected: Value 'LOLWUT' at 'configurationRecorder.recordingGroup.recordingStrategy.useOnly' failed to satisfy " + "constraint: Member must satisfy enum value set: [INCLUSION_BY_RESOURCE_TYPES, ALL_SUPPORTED_RESOURCE_TYPES, EXCLUSION_BY_RESOURCE_TYPES]" + in str(ce.value.response["Error"]["Message"]) + ) + # Create a proper one: client.put_configuration_recorder( ConfigurationRecorder={ @@ -132,6 +214,9 @@ def test_put_configuration_recorder(): "AWS::EC2::Volume" in result[0]["recordingGroup"]["resourceTypes"] and "AWS::EC2::VPC" in result[0]["recordingGroup"]["resourceTypes"] ) + assert result[0]["recordingGroup"]["recordingStrategy"] == { + "useOnly": "INCLUSION_BY_RESOURCE_TYPES" + } # Now update the configuration recorder: client.put_configuration_recorder( @@ -151,6 +236,50 @@ def test_put_configuration_recorder(): assert result[0]["recordingGroup"]["allSupported"] assert result[0]["recordingGroup"]["includeGlobalResourceTypes"] assert len(result[0]["recordingGroup"]["resourceTypes"]) == 0 + assert result[0]["recordingGroup"]["recordingStrategy"] == { + "useOnly": "ALL_SUPPORTED_RESOURCE_TYPES" + } + + # Verify that it works with the strategy passed in: + client.put_configuration_recorder( + ConfigurationRecorder={ + "name": "testrecorder", + "roleARN": "somearn", + "recordingGroup": { + "allSupported": True, + "includeGlobalResourceTypes": True, + "recordingStrategy": {"useOnly": "ALL_SUPPORTED_RESOURCE_TYPES"}, + }, + } + ) + assert client.describe_configuration_recorders()["ConfigurationRecorders"][0][ + "recordingGroup" + ]["allSupported"] + + # Update again for exclusions: + client.put_configuration_recorder( + ConfigurationRecorder={ + "name": "testrecorder", + "roleARN": "somearn", + "recordingGroup": { + "exclusionByResourceTypes": {"resourceTypes": ["AWS::EC2::Instance"]}, + "recordingStrategy": {"useOnly": "EXCLUSION_BY_RESOURCE_TYPES"}, + }, + } + ) + result = client.describe_configuration_recorders()["ConfigurationRecorders"] + assert len(result) == 1 + assert result[0]["name"] == "testrecorder" + assert result[0]["roleARN"] == "somearn" + assert not result[0]["recordingGroup"]["allSupported"] + assert not result[0]["recordingGroup"]["includeGlobalResourceTypes"] + assert not result[0]["recordingGroup"]["resourceTypes"] + assert result[0]["recordingGroup"]["exclusionByResourceTypes"]["resourceTypes"] == [ + "AWS::EC2::Instance" + ] + assert result[0]["recordingGroup"]["recordingStrategy"] == { + "useOnly": "EXCLUSION_BY_RESOURCE_TYPES" + } # With a default recording group (i.e. lacking one) client.put_configuration_recorder( @@ -162,7 +291,11 @@ def test_put_configuration_recorder(): assert result[0]["roleARN"] == "somearn" assert result[0]["recordingGroup"]["allSupported"] assert not result[0]["recordingGroup"]["includeGlobalResourceTypes"] - assert not result[0]["recordingGroup"].get("resourceTypes") + assert not result[0]["recordingGroup"]["resourceTypes"] + assert not result[0]["recordingGroup"]["exclusionByResourceTypes"]["resourceTypes"] + assert result[0]["recordingGroup"]["recordingStrategy"] == { + "useOnly": "ALL_SUPPORTED_RESOURCE_TYPES" + } # Can currently only have exactly 1 Config Recorder in an account/region: with pytest.raises(ClientError) as ce: