From 936656ed92c203361ef2e78cc384d0928147c33d Mon Sep 17 00:00:00 2001 From: Bert Blommers Date: Wed, 18 Oct 2023 18:29:20 +0000 Subject: [PATCH] Service: Inspector2 (#6925) --- IMPLEMENTATION_COVERAGE.md | 53 +++- docs/docs/services/inspector2.rst | 110 +++++++ moto/__init__.py | 9 +- moto/backend_index.py | 1 + moto/inspector2/__init__.py | 5 + moto/inspector2/models.py | 292 ++++++++++++++++++ moto/inspector2/responses.py | 151 +++++++++ moto/inspector2/urls.py | 29 ++ moto/ivs/urls.py | 16 +- moto/moto_api/_internal/models.py | 11 + moto/moto_api/_internal/responses.py | 22 ++ moto/moto_api/_internal/urls.py | 1 + tests/test_inspector2/__init__.py | 0 tests/test_inspector2/test_inspector2.py | 60 ++++ .../test_inspector2_admin_accounts.py | 29 ++ .../test_inspector2/test_inspector2_enable.py | 63 ++++ .../test_inspector2_findings.py | 59 ++++ .../test_inspector2_members.py | 22 ++ .../test_inspector2_organization.py | 45 +++ tests/test_inspector2/test_inspector2_tags.py | 49 +++ 20 files changed, 1011 insertions(+), 16 deletions(-) create mode 100644 docs/docs/services/inspector2.rst create mode 100644 moto/inspector2/__init__.py create mode 100644 moto/inspector2/models.py create mode 100644 moto/inspector2/responses.py create mode 100644 moto/inspector2/urls.py create mode 100644 tests/test_inspector2/__init__.py create mode 100644 tests/test_inspector2/test_inspector2.py create mode 100644 tests/test_inspector2/test_inspector2_admin_accounts.py create mode 100644 tests/test_inspector2/test_inspector2_enable.py create mode 100644 tests/test_inspector2/test_inspector2_findings.py create mode 100644 tests/test_inspector2/test_inspector2_members.py create mode 100644 tests/test_inspector2/test_inspector2_organization.py create mode 100644 tests/test_inspector2/test_inspector2_tags.py diff --git a/IMPLEMENTATION_COVERAGE.md b/IMPLEMENTATION_COVERAGE.md index bc82c94c0..f0ff003b3 100644 --- a/IMPLEMENTATION_COVERAGE.md +++ b/IMPLEMENTATION_COVERAGE.md @@ -3838,6 +3838,58 @@ - [ ] update_user +## inspector2 +
+41% implemented + +- [X] associate_member +- [X] batch_get_account_status +- [ ] batch_get_code_snippet +- [ ] batch_get_finding_details +- [ ] batch_get_free_trial_info +- [ ] batch_get_member_ec2_deep_inspection_status +- [ ] batch_update_member_ec2_deep_inspection_status +- [ ] cancel_findings_report +- [ ] cancel_sbom_export +- [X] create_filter +- [ ] create_findings_report +- [ ] create_sbom_export +- [X] delete_filter +- [X] describe_organization_configuration +- [X] disable +- [X] disable_delegated_admin_account +- [X] disassociate_member +- [X] enable +- [X] enable_delegated_admin_account +- [ ] get_configuration +- [ ] get_delegated_admin_account +- [ ] get_ec2_deep_inspection_configuration +- [ ] get_encryption_key +- [ ] get_findings_report_status +- [X] get_member +- [ ] get_sbom_export +- [ ] list_account_permissions +- [ ] list_coverage +- [ ] list_coverage_statistics +- [X] list_delegated_admin_accounts +- [X] list_filters +- [ ] list_finding_aggregations +- [X] list_findings +- [X] list_members +- [X] list_tags_for_resource +- [ ] list_usage_totals +- [ ] reset_encryption_key +- [ ] search_vulnerabilities +- [X] tag_resource +- [X] untag_resource +- [ ] update_configuration +- [ ] update_ec2_deep_inspection_configuration +- [ ] update_encryption_key +- [ ] update_filter +- [ ] update_org_ec2_deep_inspection_configuration +- [X] update_organization_configuration +
+ ## iot
32% implemented @@ -7515,7 +7567,6 @@ - imagebuilder - importexport - inspector -- inspector2 - internetmonitor - iot-jobs-data - iot-roborunner diff --git a/docs/docs/services/inspector2.rst b/docs/docs/services/inspector2.rst new file mode 100644 index 000000000..1cb6fe3ac --- /dev/null +++ b/docs/docs/services/inspector2.rst @@ -0,0 +1,110 @@ +.. _implementedservice_inspector2: + +.. |start-h3| raw:: html + +

+ +.. |end-h3| raw:: html + +

+ +========== +inspector2 +========== + +|start-h3| Example usage |end-h3| + +.. sourcecode:: python + + @mock_inspector2 + def test_inspector2_behaviour: + boto3.client("inspector2") + ... + + + +|start-h3| Implemented features for this service |end-h3| + +- [X] associate_member +- [X] batch_get_account_status +- [ ] batch_get_code_snippet +- [ ] batch_get_finding_details +- [ ] batch_get_free_trial_info +- [ ] batch_get_member_ec2_deep_inspection_status +- [ ] batch_update_member_ec2_deep_inspection_status +- [ ] cancel_findings_report +- [ ] cancel_sbom_export +- [X] create_filter +- [ ] create_findings_report +- [ ] create_sbom_export +- [X] delete_filter +- [X] describe_organization_configuration +- [X] disable +- [X] disable_delegated_admin_account +- [X] disassociate_member +- [X] enable +- [X] enable_delegated_admin_account +- [ ] get_configuration +- [ ] get_delegated_admin_account +- [ ] get_ec2_deep_inspection_configuration +- [ ] get_encryption_key +- [ ] get_findings_report_status +- [X] get_member +- [ ] get_sbom_export +- [ ] list_account_permissions +- [ ] list_coverage +- [ ] list_coverage_statistics +- [X] list_delegated_admin_accounts +- [X] list_filters + + Pagination is not yet implemented + + +- [ ] list_finding_aggregations +- [X] list_findings + + This call will always return 0 findings by default. + + You can use a dedicated API to override this, by configuring a queue of expected results. + + A request to `list_findings` will take the first result from that queue, and assign it to the provided arguments. Subsequent calls using the same arguments will return the same result. Other requests using a different SQL-query will take the next result from the queue, or return an empty result if the queue is empty. + + Configure this queue by making an HTTP request to `/moto-api/static/inspector2/findings-results`. An example invocation looks like this: + + .. sourcecode:: python + + findings = { + "results": [ + [{ + "awsAccountId": "111122223333", + "codeVulnerabilityDetails": {"cwes": ["a"], "detectorId": ".."}, + }], + # .. other findings as required + ], + "account_id": "123456789012", # This is the default - can be omitted + "region": "us-east-1", # This is the default - can be omitted + } + resp = requests.post( + "http://motoapi.amazonaws.com:5000/moto-api/static/inspector2/findings-results", + json=findings, + ) + + inspector2 = boto3.client("inspector2", region_name="us-east-1") + findings = inspector2.list_findings()["findings"] + + + +- [X] list_members +- [X] list_tags_for_resource +- [ ] list_usage_totals +- [ ] reset_encryption_key +- [ ] search_vulnerabilities +- [X] tag_resource +- [X] untag_resource +- [ ] update_configuration +- [ ] update_ec2_deep_inspection_configuration +- [ ] update_encryption_key +- [ ] update_filter +- [ ] update_org_ec2_deep_inspection_configuration +- [X] update_organization_configuration + diff --git a/moto/__init__.py b/moto/__init__.py index 580dd5f35..a686ffe55 100644 --- a/moto/__init__.py +++ b/moto/__init__.py @@ -100,13 +100,14 @@ mock_firehose = lazy_load(".firehose", "mock_firehose") mock_forecast = lazy_load(".forecast", "mock_forecast") mock_greengrass = lazy_load(".greengrass", "mock_greengrass") mock_glacier = lazy_load(".glacier", "mock_glacier") -mock_glue = lazy_load(".glue", "mock_glue", boto3_name="glue") +mock_glue = lazy_load(".glue", "mock_glue") mock_guardduty = lazy_load(".guardduty", "mock_guardduty") mock_iam = lazy_load(".iam", "mock_iam") mock_identitystore = lazy_load(".identitystore", "mock_identitystore") +mock_inspector2 = lazy_load(".inspector2", "mock_inspector2") mock_iot = lazy_load(".iot", "mock_iot") mock_iotdata = lazy_load(".iotdata", "mock_iotdata", boto3_name="iot-data") -mock_ivs = lazy_load(".ivs", "mock_ivs", boto3_name="ivs") +mock_ivs = lazy_load(".ivs", "mock_ivs") mock_kinesis = lazy_load(".kinesis", "mock_kinesis") mock_kinesisvideo = lazy_load(".kinesisvideo", "mock_kinesisvideo") mock_kinesisvideoarchivedmedia = lazy_load( @@ -151,9 +152,7 @@ mock_resourcegroupstaggingapi = lazy_load( ) mock_robomaker = lazy_load(".robomaker", "mock_robomaker") mock_route53 = lazy_load(".route53", "mock_route53") -mock_route53resolver = lazy_load( - ".route53resolver", "mock_route53resolver", boto3_name="route53resolver" -) +mock_route53resolver = lazy_load(".route53resolver", "mock_route53resolver") mock_s3 = lazy_load(".s3", "mock_s3") mock_s3control = lazy_load(".s3control", "mock_s3control") mock_sagemaker = lazy_load(".sagemaker", "mock_sagemaker") diff --git a/moto/backend_index.py b/moto/backend_index.py index a55cbdd3f..408f697ec 100644 --- a/moto/backend_index.py +++ b/moto/backend_index.py @@ -85,6 +85,7 @@ backend_url_patterns = [ ("guardduty", re.compile("https?://guardduty\\.(.+)\\.amazonaws\\.com")), ("iam", re.compile("https?://iam\\.(.*\\.)?amazonaws\\.com")), ("identitystore", re.compile("https?://identitystore\\.(.+)\\.amazonaws\\.com")), + ("inspector2", re.compile("https?://inspector2\\.(.+)\\.amazonaws\\.com")), ("iot", re.compile("https?://iot\\.(.+)\\.amazonaws\\.com")), ("iot-data", re.compile("https?://data\\.iot\\.(.+)\\.amazonaws.com")), ("iot-data", re.compile("https?://data-ats\\.iot\\.(.+)\\.amazonaws.com")), diff --git a/moto/inspector2/__init__.py b/moto/inspector2/__init__.py new file mode 100644 index 000000000..0ce83bc98 --- /dev/null +++ b/moto/inspector2/__init__.py @@ -0,0 +1,5 @@ +"""inspector2 module initialization; sets value for base decorator.""" +from .models import inspector2_backends +from ..core.models import base_decorator + +mock_inspector2 = base_decorator(inspector2_backends) diff --git a/moto/inspector2/models.py b/moto/inspector2/models.py new file mode 100644 index 000000000..8ae024569 --- /dev/null +++ b/moto/inspector2/models.py @@ -0,0 +1,292 @@ +import json +from typing import Any, Dict, List, Optional, Iterable +from moto.core import BaseBackend, BackendDict, BaseModel +from moto.core.utils import unix_time +from moto.moto_api._internal import mock_random +from moto.utilities.tagging_service import TaggingService + + +class FilterResource(BaseModel): + def __init__( + self, + region: str, + account_id: str, + name: str, + reason: Optional[str], + action: str, + description: Optional[str], + filter_criteria: Dict[str, Any], + backend: "Inspector2Backend", + ): + filter_id = mock_random.get_random_hex(10) + self.owner_id = account_id + self.arn = f"arn:aws:inspector2:{region}:{account_id}:owner/{self.owner_id}/filter/{filter_id}" + self.name = name + self.reason = reason + self.action = action + self.description = description + self.filter_criteria = filter_criteria + self.created_at = unix_time() + self.backend = backend + + def to_json(self) -> Dict[str, Any]: + return { + "action": self.action, + "arn": self.arn, + "createdAt": self.created_at, + "criteria": self.filter_criteria, + "description": self.description, + "name": self.name, + "ownerId": self.owner_id, + "reason": self.reason, + "tags": self.backend.list_tags_for_resource(self.arn), + } + + +class AccountStatus(BaseModel): + def __init__(self, account_id: str): + self.account_id = account_id + self.ec2 = "DISABLED" + self.ecr = "DISABLED" + self._lambda = "DISABLED" + self.lambda_code = "DISABLED" + + def toggle(self, resource_types: List[str], enable: bool) -> None: + if "EC2" in resource_types: + self.ec2 = "ENABLED" if enable else "DISABLED" + if "ECR" in resource_types: + self.ecr = "ENABLED" if enable else "DISABLED" + if "LAMBDA" in resource_types: + self._lambda = "ENABLED" if enable else "DISABLED" + if "LAMBDA_CODE" in resource_types or "LAMBDACODE" in resource_types: + self.lambda_code = "ENABLED" if enable else "DISABLED" + + def to_json(self) -> Dict[str, Any]: + return { + "accountId": self.account_id, + "resourceStatus": { + "ec2": self.ec2, + "ecr": self.ecr, + "lambda": self._lambda, + "lambdaCode": self.lambda_code, + }, + "status": self._status(), + } + + def _status(self) -> str: + return ( + "ENABLED" + if "ENABLED" in [self.ec2, self.ecr, self._lambda, self.lambda_code] + else "DISABLED" + ) + + def to_batch_json(self) -> Dict[str, Any]: + return { + "accountId": self.account_id, + "resourceState": { + "ec2": {"status": self.ec2}, + "ecr": {"status": self.ecr}, + "lambda": {"status": self._lambda}, + "lambdaCode": {"status": self.lambda_code}, + }, + "state": {"status": self._status()}, + } + + +class Member(BaseModel): + def __init__(self, account_id: str, admin_account_id: str): + self.account_id = account_id + self.admin_account_id = admin_account_id + self.status = "ENABLED" + self.updated_at = unix_time() + + def to_json(self) -> Dict[str, Any]: + return { + "accountId": self.account_id, + "delegatedAdminAccountId": self.admin_account_id, + "relationshipStatus": self.status, + "updatedAt": self.updated_at, + } + + +class Inspector2Backend(BaseBackend): + def __init__(self, region_name: str, account_id: str): + super().__init__(region_name, account_id) + self.filters: Dict[str, FilterResource] = dict() + self.admin_accounts: Dict[str, str] = dict() + self.account_status: Dict[str, AccountStatus] = dict() + self.members: Dict[str, Member] = dict() + self.org_config = { + "ec2": False, + "ecr": False, + "lambda": False, + "lambdaCode": False, + } + self.tagger = TaggingService() + self.findings_queue: List[Any] = [] + self.findings: Dict[str, Any] = {} + + def create_filter( + self, + action: str, + description: str, + filter_criteria: Dict[str, Any], + name: str, + reason: str, + tags: Dict[str, str], + ) -> str: + _filter = FilterResource( + region=self.region_name, + account_id=self.account_id, + action=action, + description=description, + filter_criteria=filter_criteria, + name=name, + reason=reason, + backend=self, + ) + self.filters[_filter.arn] = _filter + self.tag_resource(_filter.arn, tags) + return _filter.arn + + def delete_filter(self, arn: str) -> None: + self.filters.pop(arn, None) + + def list_filters(self, action: str, arns: List[str]) -> Iterable[FilterResource]: + """ + Pagination is not yet implemented + """ + return [ + f + for f in self.filters.values() + if (arns and f.arn in arns) + or (action and f.action == action) + or (not arns and not action) + ] + + def list_findings( + self, + filter_criteria: List[Dict[str, Any]], + max_results: str, + next_token: str, + sort_criteria: str, + ) -> List[Dict[str, Any]]: + """ + This call will always return 0 findings by default. + + You can use a dedicated API to override this, by configuring a queue of expected results. + + A request to `list_findings` will take the first result from that queue, and assign it to the provided arguments. Subsequent calls using the same arguments will return the same result. Other requests using a different SQL-query will take the next result from the queue, or return an empty result if the queue is empty. + + Configure this queue by making an HTTP request to `/moto-api/static/inspector2/findings-results`. An example invocation looks like this: + + .. sourcecode:: python + + findings = { + "results": [ + [{ + "awsAccountId": "111122223333", + "codeVulnerabilityDetails": {"cwes": ["a"], "detectorId": ".."}, + }], + # .. other findings as required + ], + "account_id": "123456789012", # This is the default - can be omitted + "region": "us-east-1", # This is the default - can be omitted + } + resp = requests.post( + "http://motoapi.amazonaws.com:5000/moto-api/static/inspector2/findings-results", + json=findings, + ) + + inspector2 = boto3.client("inspector2", region_name="us-east-1") + findings = inspector2.list_findings()["findings"] + + """ + key = f"{json.dumps(filter_criteria)}--{max_results}--{next_token}--{sort_criteria}" + if key not in self.findings and self.findings_queue: + self.findings[key] = self.findings_queue.pop(0) + if key in self.findings: + return self.findings[key] + else: + return [] + + def list_delegated_admin_accounts(self) -> Dict[str, str]: + return self.admin_accounts + + def enable_delegated_admin_account(self, account_id: str) -> None: + self.admin_accounts[account_id] = "ENABLED" + + def disable_delegated_admin_account(self, account_id: str) -> None: + self.admin_accounts[account_id] = "DISABLED" + + def describe_organization_configuration(self) -> Dict[str, Any]: + return {"autoEnable": self.org_config, "maxAccountLimitReached": False} + + def update_organization_configuration( + self, auto_enable: Dict[str, bool] + ) -> Dict[str, Any]: + self.org_config.update(auto_enable) + return {"autoEnable": self.org_config} + + def disable( + self, account_ids: List[str], resource_types: List[str] + ) -> List[Dict[str, Any]]: + for acct in account_ids: + if acct not in self.account_status: + self.account_status[acct] = AccountStatus(acct) + self.account_status[acct].toggle(resource_types, enable=False) + + return [ + status.to_json() + for a_id, status in self.account_status.items() + if a_id in account_ids + ] + + def enable( + self, account_ids: List[str], resource_types: List[str] + ) -> List[Dict[str, Any]]: + for acct in account_ids: + if acct not in self.account_status: + self.account_status[acct] = AccountStatus(acct) + self.account_status[acct].toggle(resource_types, enable=True) + + return [ + status.to_json() + for a_id, status in self.account_status.items() + if a_id in account_ids + ] + + def batch_get_account_status(self, account_ids: List[str]) -> List[Dict[str, Any]]: + return [ + status.to_batch_json() + for a_id, status in self.account_status.items() + if a_id in account_ids + ] + + def list_members(self) -> Iterable[Member]: + return self.members.values() + + def associate_member(self, account_id: str) -> None: + self.members[account_id] = Member( + account_id=account_id, admin_account_id=self.account_id + ) + + def disassociate_member(self, account_id: str) -> None: + self.members[account_id].status = "DISABLED" + + def get_member(self, account_id: str) -> Member: + return self.members[account_id] + + def tag_resource(self, resource_arn: str, tags: Dict[str, str]) -> None: + self.tagger.tag_resource( + resource_arn, TaggingService.convert_dict_to_tags_input(tags) + ) + + def list_tags_for_resource(self, resource_arn: str) -> Dict[str, str]: + return self.tagger.get_tag_dict_for_resource(resource_arn) + + def untag_resource(self, arn: str, tag_keys: List[str]) -> None: + self.tagger.untag_resource_using_names(arn, tag_keys) + + +inspector2_backends = BackendDict(Inspector2Backend, "inspector2") diff --git a/moto/inspector2/responses.py b/moto/inspector2/responses.py new file mode 100644 index 000000000..fb457d713 --- /dev/null +++ b/moto/inspector2/responses.py @@ -0,0 +1,151 @@ +import json +from typing import Any, Dict, List +from urllib.parse import unquote + +from moto.core.common_types import TYPE_RESPONSE +from moto.core.responses import BaseResponse +from .models import inspector2_backends, Inspector2Backend + + +class Inspector2Response(BaseResponse): + def __init__(self) -> None: + super().__init__(service_name="inspector2") + + @property + def inspector2_backend(self) -> Inspector2Backend: + return inspector2_backends[self.current_account][self.region] + + def create_filter(self) -> str: + action = self._get_param("action") + description = self._get_param("description") + filter_criteria = self._get_param("filterCriteria") + name = self._get_param("name") + reason = self._get_param("reason") + tags = self._get_param("tags") + arn = self.inspector2_backend.create_filter( + action=action, + description=description, + filter_criteria=filter_criteria, + name=name, + reason=reason, + tags=tags, + ) + return json.dumps(dict(arn=arn)) + + def delete_filter(self) -> str: + arn = self._get_param("arn") + self.inspector2_backend.delete_filter(arn=arn) + return json.dumps(dict(arn=arn)) + + def list_filters(self) -> str: + action = self._get_param("action") + arns = self._get_param("arns") + filters = self.inspector2_backend.list_filters(action=action, arns=arns) + return json.dumps({"filters": [f.to_json() for f in filters]}) + + def list_findings(self) -> str: + filter_criteria = self._get_param("filterCriteria") + max_results = self._get_param("maxResults") + next_token = self._get_param("nextToken") + sort_criteria = self._get_param("sortCriteria") + findings = self.inspector2_backend.list_findings( + filter_criteria=filter_criteria, + max_results=max_results, + next_token=next_token, + sort_criteria=sort_criteria, + ) + return json.dumps(dict(findings=findings)) + + def list_delegated_admin_accounts(self) -> str: + accounts = self.inspector2_backend.list_delegated_admin_accounts() + return json.dumps( + { + "delegatedAdminAccounts": [ + {"accountId": key, "status": val} for key, val in accounts.items() + ] + } + ) + + def enable_delegated_admin_account(self) -> str: + account_id = self._get_param("delegatedAdminAccountId") + self.inspector2_backend.enable_delegated_admin_account(account_id) + return json.dumps({"delegatedAdminAccountId": account_id}) + + def disable_delegated_admin_account(self) -> str: + account_id = self._get_param("delegatedAdminAccountId") + self.inspector2_backend.disable_delegated_admin_account(account_id) + return json.dumps({"delegatedAdminAccountId": account_id}) + + def describe_organization_configuration(self) -> str: + config = self.inspector2_backend.describe_organization_configuration() + return json.dumps(config) + + def update_organization_configuration(self) -> str: + auto_enable = self._get_param("autoEnable") + config = self.inspector2_backend.update_organization_configuration(auto_enable) + return json.dumps(config) + + def enable(self) -> str: + account_ids = self._get_param("accountIds") + resource_types = self._get_param("resourceTypes") + accounts = self.inspector2_backend.enable(account_ids, resource_types) + failed: List[Dict[str, Any]] = [] + return json.dumps({"accounts": accounts, "failedAccounts": failed}) + + def disable(self) -> str: + account_ids = self._get_param("accountIds") + resource_types = self._get_param("resourceTypes") + accounts = self.inspector2_backend.disable(account_ids, resource_types) + failed: List[Dict[str, Any]] = [] + return json.dumps({"accounts": accounts, "failedAccounts": failed}) + + def batch_get_account_status(self) -> str: + account_ids = self._get_param("accountIds") + accounts = self.inspector2_backend.batch_get_account_status(account_ids) + failed: List[Dict[str, Any]] = [] + return json.dumps({"accounts": accounts, "failedAccounts": failed}) + + def list_members(self) -> str: + members = self.inspector2_backend.list_members() + return json.dumps({"members": [m.to_json() for m in members]}) + + def associate_member(self) -> str: + account_id = self._get_param("accountId") + self.inspector2_backend.associate_member(account_id) + return json.dumps({"accountId": account_id}) + + def disassociate_member(self) -> str: + account_id = self._get_param("accountId") + self.inspector2_backend.disassociate_member(account_id) + return json.dumps({"accountId": account_id}) + + def get_member(self) -> str: + account_id = self._get_param("accountId") + member = self.inspector2_backend.get_member(account_id) + return json.dumps({"member": member.to_json()}) + + def list_tags_for_resource(self) -> TYPE_RESPONSE: + arn = unquote(self.path.split("/tags/")[-1]) + tags = self.inspector2_backend.list_tags_for_resource(arn) + return 200, {}, json.dumps({"tags": tags}) + + def tag_resource(self) -> TYPE_RESPONSE: + resource_arn = unquote(self.path.split("/tags/")[-1]) + tags = self._get_param("tags") + self.inspector2_backend.tag_resource(resource_arn=resource_arn, tags=tags) + return 200, {}, "{}" + + def untag_resource(self) -> TYPE_RESPONSE: + resource_arn = unquote(self.path.split("/tags/")[-1]) + tag_keys = self.querystring.get("tagKeys") + self.inspector2_backend.untag_resource(resource_arn, tag_keys) # type: ignore + return 200, {}, "{}" + + def tags(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore[return] + self.setup_class(request, full_url, headers) + if request.method == "GET": + return self.list_tags_for_resource() + if request.method == "POST": + return self.tag_resource() + if request.method == "DELETE": + return self.untag_resource() diff --git a/moto/inspector2/urls.py b/moto/inspector2/urls.py new file mode 100644 index 000000000..9930eb033 --- /dev/null +++ b/moto/inspector2/urls.py @@ -0,0 +1,29 @@ +"""inspector2 base URL and path.""" +from .responses import Inspector2Response + +url_bases = [ + r"https?://inspector2\.(.+)\.amazonaws\.com", +] + + +url_paths = { + "{0}/delegatedadminaccounts/disable$": Inspector2Response.dispatch, + "{0}/delegatedadminaccounts/enable$": Inspector2Response.dispatch, + "{0}/delegatedadminaccounts/list$": Inspector2Response.dispatch, + "{0}/disable$": Inspector2Response.dispatch, + "{0}/enable$": Inspector2Response.dispatch, + "{0}/filters/create$": Inspector2Response.dispatch, + "{0}/filters/delete$": Inspector2Response.dispatch, + "{0}/filters/list$": Inspector2Response.dispatch, + "{0}/findings/list$": Inspector2Response.dispatch, + "{0}/members/associate$": Inspector2Response.dispatch, + "{0}/members/get": Inspector2Response.dispatch, + "{0}/members/list": Inspector2Response.dispatch, + "{0}/members/disassociate$": Inspector2Response.dispatch, + "{0}/status/batch/get$": Inspector2Response.dispatch, + "{0}/organizationconfiguration/describe$": Inspector2Response.dispatch, + "{0}/organizationconfiguration/update$": Inspector2Response.dispatch, + "{0}/tags/(?P.+)$": Inspector2Response.method_dispatch( + Inspector2Response.tags # type: ignore + ), +} diff --git a/moto/ivs/urls.py b/moto/ivs/urls.py index 7c4a85267..f0edd4af1 100644 --- a/moto/ivs/urls.py +++ b/moto/ivs/urls.py @@ -6,15 +6,11 @@ url_bases = [ r"https?://ivs\.(.+)\.amazonaws\.com", ] - -response = IVSResponse() - - url_paths = { - "{0}/CreateChannel": response.dispatch, - "{0}/ListChannels": response.dispatch, - "{0}/GetChannel": response.dispatch, - "{0}/BatchGetChannel": response.dispatch, - "{0}/UpdateChannel": response.dispatch, - "{0}/DeleteChannel": response.dispatch, + "{0}/CreateChannel": IVSResponse.dispatch, + "{0}/ListChannels": IVSResponse.dispatch, + "{0}/GetChannel": IVSResponse.dispatch, + "{0}/BatchGetChannel": IVSResponse.dispatch, + "{0}/UpdateChannel": IVSResponse.dispatch, + "{0}/DeleteChannel": IVSResponse.dispatch, } diff --git a/moto/moto_api/_internal/models.py b/moto/moto_api/_internal/models.py index af75c287b..6690cb6c4 100644 --- a/moto/moto_api/_internal/models.py +++ b/moto/moto_api/_internal/models.py @@ -83,5 +83,16 @@ class MotoAPIBackend(BaseBackend): ) ) + def set_inspector2_findings_result( + self, + results: Optional[List[List[Dict[str, Any]]]], + account_id: str, + region: str, + ) -> None: + from moto.inspector2.models import inspector2_backends + + backend = inspector2_backends[account_id][region] + backend.findings_queue.append(results) + moto_api_backend = MotoAPIBackend(region_name="global", account_id=DEFAULT_ACCOUNT_ID) diff --git a/moto/moto_api/_internal/responses.py b/moto/moto_api/_internal/responses.py index e4c885942..fc094a5a5 100644 --- a/moto/moto_api/_internal/responses.py +++ b/moto/moto_api/_internal/responses.py @@ -227,3 +227,25 @@ class MotoAPIResponse(BaseResponse): region=region, ) return 201, {}, "" + + def set_inspector2_findings_result( + self, + request: Any, + full_url: str, # pylint: disable=unused-argument + headers: Any, + ) -> TYPE_RESPONSE: + from .models import moto_api_backend + + request_body_size = int(headers["Content-Length"]) + body = request.environ["wsgi.input"].read(request_body_size).decode("utf-8") + body = json.loads(body) + account_id = body.get("account_id", DEFAULT_ACCOUNT_ID) + region = body.get("region", "us-east-1") + + for result in body.get("results", []): + moto_api_backend.set_inspector2_findings_result( + results=result, + account_id=account_id, + region=region, + ) + return 201, {}, "" diff --git a/moto/moto_api/_internal/urls.py b/moto/moto_api/_internal/urls.py index 6b9977355..58b7191e7 100644 --- a/moto/moto_api/_internal/urls.py +++ b/moto/moto_api/_internal/urls.py @@ -13,6 +13,7 @@ url_paths = { "{0}/moto-api/reset-auth": response_instance.reset_auth_response, "{0}/moto-api/seed": response_instance.seed, "{0}/moto-api/static/athena/query-results": response_instance.set_athena_result, + "{0}/moto-api/static/inspector2/findings-results": response_instance.set_inspector2_findings_result, "{0}/moto-api/static/sagemaker/endpoint-results": response_instance.set_sagemaker_result, "{0}/moto-api/static/rds-data/statement-results": response_instance.set_rds_data_result, "{0}/moto-api/state-manager/get-transition": response_instance.get_transition, diff --git a/tests/test_inspector2/__init__.py b/tests/test_inspector2/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/test_inspector2/test_inspector2.py b/tests/test_inspector2/test_inspector2.py new file mode 100644 index 000000000..52ff5a020 --- /dev/null +++ b/tests/test_inspector2/test_inspector2.py @@ -0,0 +1,60 @@ +import boto3 + +from moto import mock_inspector2 + +# See our Development Tips on writing tests for hints on how to write good tests: +# http://docs.getmoto.org/en/latest/docs/contributing/development_tips/tests.html + + +@mock_inspector2 +def test_create_filter(): + client = boto3.client("inspector2", region_name="us-east-2") + resp = client.create_filter( + name="my_first_filter", + reason="because I said so", + action="NONE", + description="my filter", + filterCriteria={ + "codeVulnerabilityDetectorName": [{"comparison": "EQUALS", "value": "cvdn"}] + }, + ) + assert "arn" in resp + + +@mock_inspector2 +def test_list_filters(): + client = boto3.client("inspector2", region_name="ap-southeast-1") + assert client.list_filters()["filters"] == [] + + arn1 = client.create_filter( + name="my_first_filter", + action="NONE", + filterCriteria={"findingArn": [{"comparison": "EQUALS", "value": "cvdn"}]}, + )["arn"] + + filters = client.list_filters()["filters"] + assert len(filters) == 1 + assert filters[0]["arn"] == arn1 + + arn2 = client.create_filter( + name="my_second_filter", + action="SUPPRESS", + filterCriteria={"fixAvailable": [{"comparison": "EQUALS", "value": "cvdn"}]}, + )["arn"] + + filters = client.list_filters()["filters"] + assert len(filters) == 2 + + filters = client.list_filters(action="SUPPRESS")["filters"] + assert len(filters) == 1 + assert filters[0]["arn"] == arn2 + + filters = client.list_filters(arns=[arn1])["filters"] + assert len(filters) == 1 + assert filters[0]["arn"] == arn1 + + client.delete_filter(arn=arn1) + + filters = client.list_filters()["filters"] + assert len(filters) == 1 + assert filters[0]["arn"] == arn2 diff --git a/tests/test_inspector2/test_inspector2_admin_accounts.py b/tests/test_inspector2/test_inspector2_admin_accounts.py new file mode 100644 index 000000000..d8e6a7174 --- /dev/null +++ b/tests/test_inspector2/test_inspector2_admin_accounts.py @@ -0,0 +1,29 @@ +import boto3 + +from moto import mock_inspector2 + +# See our Development Tips on writing tests for hints on how to write good tests: +# http://docs.getmoto.org/en/latest/docs/contributing/development_tips/tests.html + + +@mock_inspector2 +def test_deleted_accounts(): + client = boto3.client("inspector2", region_name="us-east-1") + + assert client.list_delegated_admin_accounts()["delegatedAdminAccounts"] == [] + + resp = client.enable_delegated_admin_account(delegatedAdminAccountId="111111111111") + assert resp["delegatedAdminAccountId"] == "111111111111" + + assert client.list_delegated_admin_accounts()["delegatedAdminAccounts"] == [ + {"accountId": "111111111111", "status": "ENABLED"} + ] + + resp = client.disable_delegated_admin_account( + delegatedAdminAccountId="111111111111" + ) + assert resp["delegatedAdminAccountId"] == "111111111111" + + assert client.list_delegated_admin_accounts()["delegatedAdminAccounts"] == [ + {"accountId": "111111111111", "status": "DISABLED"} + ] diff --git a/tests/test_inspector2/test_inspector2_enable.py b/tests/test_inspector2/test_inspector2_enable.py new file mode 100644 index 000000000..9dc152a23 --- /dev/null +++ b/tests/test_inspector2/test_inspector2_enable.py @@ -0,0 +1,63 @@ +import boto3 + +from moto import mock_inspector2 +from tests import DEFAULT_ACCOUNT_ID + +# See our Development Tips on writing tests for hints on how to write good tests: +# http://docs.getmoto.org/en/latest/docs/contributing/development_tips/tests.html + + +@mock_inspector2 +def test_organization_configuration(): + client = boto3.client("inspector2", region_name="us-west-1") + + resp = client.enable(accountIds=[DEFAULT_ACCOUNT_ID], resourceTypes=["EC2", "ECR"]) + assert resp["accounts"] == [ + { + "accountId": DEFAULT_ACCOUNT_ID, + "resourceStatus": { + "ec2": "ENABLED", + "ecr": "ENABLED", + "lambda": "DISABLED", + "lambdaCode": "DISABLED", + }, + "status": "ENABLED", + } + ] + assert resp["failedAccounts"] == [] + + resp = client.batch_get_account_status(accountIds=[DEFAULT_ACCOUNT_ID]) + assert resp["accounts"] == [ + { + "accountId": "123456789012", + "resourceState": { + "ec2": {"status": "ENABLED"}, + "ecr": {"status": "ENABLED"}, + "lambda": {"status": "DISABLED"}, + "lambdaCode": {"status": "DISABLED"}, + }, + "state": {"status": "ENABLED"}, + } + ] + assert resp["failedAccounts"] == [] + + resp = client.disable( + accountIds=[DEFAULT_ACCOUNT_ID], resourceTypes=["LAMBDA", "ECR"] + ) + assert resp["accounts"] == [ + { + "accountId": DEFAULT_ACCOUNT_ID, + "resourceStatus": { + "ec2": "ENABLED", + "ecr": "DISABLED", + "lambda": "DISABLED", + "lambdaCode": "DISABLED", + }, + "status": "ENABLED", + } + ] + + client.disable(accountIds=[DEFAULT_ACCOUNT_ID], resourceTypes=["EC2"]) + + resp = client.batch_get_account_status(accountIds=[DEFAULT_ACCOUNT_ID]) + assert resp["accounts"][0]["state"] == {"status": "DISABLED"} diff --git a/tests/test_inspector2/test_inspector2_findings.py b/tests/test_inspector2/test_inspector2_findings.py new file mode 100644 index 000000000..dbaa01213 --- /dev/null +++ b/tests/test_inspector2/test_inspector2_findings.py @@ -0,0 +1,59 @@ +import boto3 +import requests + +from moto import mock_inspector2, settings + + +@mock_inspector2 +def test_set_findings(): + base_url = ( + "localhost:5000" if settings.TEST_SERVER_MODE else "motoapi.amazonaws.com" + ) + + findings = { + "results": [ + [ + { + "awsAccountId": "111122223333", + "codeVulnerabilityDetails": {"cwes": ["a"], "detectorId": ".."}, + } + ] + ], + "region": "us-west-1", + } + resp = requests.post( + f"http://{base_url}/moto-api/static/inspector2/findings-results", + json=findings, + ) + assert resp.status_code == 201 + + inspector2 = boto3.client("inspector2", region_name="us-west-1") + + assert inspector2.list_findings()["findings"] == [ + { + "awsAccountId": "111122223333", + "codeVulnerabilityDetails": { + "cwes": ["a"], + "detectorId": "..", + }, + } + ] + + # Calling list_findings with different arguments returns an empty list + assert ( + inspector2.list_findings( + filterCriteria={"awsAccountId": [{"comparison": "EQUALS", "value": "x"}]} + )["findings"] + == [] + ) + + # Calling list_findings with original arguments returns original list + assert inspector2.list_findings()["findings"] == [ + { + "awsAccountId": "111122223333", + "codeVulnerabilityDetails": { + "cwes": ["a"], + "detectorId": "..", + }, + } + ] diff --git a/tests/test_inspector2/test_inspector2_members.py b/tests/test_inspector2/test_inspector2_members.py new file mode 100644 index 000000000..5be4595fe --- /dev/null +++ b/tests/test_inspector2/test_inspector2_members.py @@ -0,0 +1,22 @@ +import boto3 + +from moto import mock_inspector2 +from tests import DEFAULT_ACCOUNT_ID + +# See our Development Tips on writing tests for hints on how to write good tests: +# http://docs.getmoto.org/en/latest/docs/contributing/development_tips/tests.html + + +@mock_inspector2 +def test_members(): + client = boto3.client("inspector2", region_name="us-east-1") + + assert client.list_members()["members"] == [] + + resp = client.associate_member(accountId="111111111111") + assert resp["accountId"] == "111111111111" + + resp = client.get_member(accountId="111111111111")["member"] + assert resp["accountId"] == "111111111111" + assert resp["delegatedAdminAccountId"] == DEFAULT_ACCOUNT_ID + assert resp["relationshipStatus"] == "ENABLED" diff --git a/tests/test_inspector2/test_inspector2_organization.py b/tests/test_inspector2/test_inspector2_organization.py new file mode 100644 index 000000000..b37590c05 --- /dev/null +++ b/tests/test_inspector2/test_inspector2_organization.py @@ -0,0 +1,45 @@ +import boto3 + +from moto import mock_inspector2 + +# See our Development Tips on writing tests for hints on how to write good tests: +# http://docs.getmoto.org/en/latest/docs/contributing/development_tips/tests.html + + +@mock_inspector2 +def test_organization_configuration(): + client = boto3.client("inspector2", region_name="us-west-1") + + resp = client.describe_organization_configuration() + + assert resp["autoEnable"] == { + "ec2": False, + "ecr": False, + "lambda": False, + "lambdaCode": False, + } + assert resp["maxAccountLimitReached"] is False + + resp = client.update_organization_configuration( + autoEnable={ + "ec2": True, + "ecr": False, + "lambda": True, + "lambdaCode": False, + } + ) + assert resp["autoEnable"] == { + "ec2": True, + "ecr": False, + "lambda": True, + "lambdaCode": False, + } + + resp = client.describe_organization_configuration() + + assert resp["autoEnable"] == { + "ec2": True, + "ecr": False, + "lambda": True, + "lambdaCode": False, + } diff --git a/tests/test_inspector2/test_inspector2_tags.py b/tests/test_inspector2/test_inspector2_tags.py new file mode 100644 index 000000000..4897b3a91 --- /dev/null +++ b/tests/test_inspector2/test_inspector2_tags.py @@ -0,0 +1,49 @@ +import boto3 + +from moto import mock_inspector2 + + +@mock_inspector2 +def test_tag_resource(): + client = boto3.client("inspector2", region_name="ap-southeast-1") + arn = client.create_filter( + name="my_first_filter", + action="NONE", + filterCriteria={"findingArn": [{"comparison": "EQUALS", "value": "cvdn"}]}, + )["arn"] + + assert client.list_tags_for_resource(resourceArn=arn)["tags"] == {} + + client.tag_resource(resourceArn=arn, tags={"k1": "v1"}) + assert client.list_tags_for_resource(resourceArn=arn)["tags"] == {"k1": "v1"} + + client.tag_resource(resourceArn=arn, tags={"k2": "v2"}) + assert client.list_tags_for_resource(resourceArn=arn)["tags"] == { + "k1": "v1", + "k2": "v2", + } + + client.untag_resource(resourceArn=arn, tagKeys=["k1"]) + assert client.list_tags_for_resource(resourceArn=arn)["tags"] == {"k2": "v2"} + + +@mock_inspector2 +def test_tag_filter(): + client = boto3.client("inspector2", region_name="ap-southeast-1") + arn = client.create_filter( + name="my_first_filter", + action="NONE", + filterCriteria={"findingArn": [{"comparison": "EQUALS", "value": "cvdn"}]}, + tags={"k1": "v1"}, + )["arn"] + + assert client.list_tags_for_resource(resourceArn=arn)["tags"] == {"k1": "v1"} + + client.tag_resource(resourceArn=arn, tags={"k2": "v2"}) + assert client.list_tags_for_resource(resourceArn=arn)["tags"] == { + "k1": "v1", + "k2": "v2", + } + + filters = client.list_filters()["filters"] + assert filters[0]["tags"] == {"k1": "v1", "k2": "v2"}