Service: Inspector2 (#6925)

This commit is contained in:
Bert Blommers 2023-10-18 18:29:20 +00:00 committed by GitHub
parent a1f261189e
commit 936656ed92
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 1011 additions and 16 deletions

View File

@ -3838,6 +3838,58 @@
- [ ] update_user
</details>
## inspector2
<details>
<summary>41% implemented</summary>
- [X] associate_member
- [X] batch_get_account_status
- [ ] batch_get_code_snippet
- [ ] batch_get_finding_details
- [ ] batch_get_free_trial_info
- [ ] batch_get_member_ec2_deep_inspection_status
- [ ] batch_update_member_ec2_deep_inspection_status
- [ ] cancel_findings_report
- [ ] cancel_sbom_export
- [X] create_filter
- [ ] create_findings_report
- [ ] create_sbom_export
- [X] delete_filter
- [X] describe_organization_configuration
- [X] disable
- [X] disable_delegated_admin_account
- [X] disassociate_member
- [X] enable
- [X] enable_delegated_admin_account
- [ ] get_configuration
- [ ] get_delegated_admin_account
- [ ] get_ec2_deep_inspection_configuration
- [ ] get_encryption_key
- [ ] get_findings_report_status
- [X] get_member
- [ ] get_sbom_export
- [ ] list_account_permissions
- [ ] list_coverage
- [ ] list_coverage_statistics
- [X] list_delegated_admin_accounts
- [X] list_filters
- [ ] list_finding_aggregations
- [X] list_findings
- [X] list_members
- [X] list_tags_for_resource
- [ ] list_usage_totals
- [ ] reset_encryption_key
- [ ] search_vulnerabilities
- [X] tag_resource
- [X] untag_resource
- [ ] update_configuration
- [ ] update_ec2_deep_inspection_configuration
- [ ] update_encryption_key
- [ ] update_filter
- [ ] update_org_ec2_deep_inspection_configuration
- [X] update_organization_configuration
</details>
## iot
<details>
<summary>32% implemented</summary>
@ -7515,7 +7567,6 @@
- imagebuilder
- importexport
- inspector
- inspector2
- internetmonitor
- iot-jobs-data
- iot-roborunner

View File

@ -0,0 +1,110 @@
.. _implementedservice_inspector2:
.. |start-h3| raw:: html
<h3>
.. |end-h3| raw:: html
</h3>
==========
inspector2
==========
|start-h3| Example usage |end-h3|
.. sourcecode:: python
@mock_inspector2
def test_inspector2_behaviour:
boto3.client("inspector2")
...
|start-h3| Implemented features for this service |end-h3|
- [X] associate_member
- [X] batch_get_account_status
- [ ] batch_get_code_snippet
- [ ] batch_get_finding_details
- [ ] batch_get_free_trial_info
- [ ] batch_get_member_ec2_deep_inspection_status
- [ ] batch_update_member_ec2_deep_inspection_status
- [ ] cancel_findings_report
- [ ] cancel_sbom_export
- [X] create_filter
- [ ] create_findings_report
- [ ] create_sbom_export
- [X] delete_filter
- [X] describe_organization_configuration
- [X] disable
- [X] disable_delegated_admin_account
- [X] disassociate_member
- [X] enable
- [X] enable_delegated_admin_account
- [ ] get_configuration
- [ ] get_delegated_admin_account
- [ ] get_ec2_deep_inspection_configuration
- [ ] get_encryption_key
- [ ] get_findings_report_status
- [X] get_member
- [ ] get_sbom_export
- [ ] list_account_permissions
- [ ] list_coverage
- [ ] list_coverage_statistics
- [X] list_delegated_admin_accounts
- [X] list_filters
Pagination is not yet implemented
- [ ] list_finding_aggregations
- [X] list_findings
This call will always return 0 findings by default.
You can use a dedicated API to override this, by configuring a queue of expected results.
A request to `list_findings` will take the first result from that queue, and assign it to the provided arguments. Subsequent calls using the same arguments will return the same result. Other requests using a different SQL-query will take the next result from the queue, or return an empty result if the queue is empty.
Configure this queue by making an HTTP request to `/moto-api/static/inspector2/findings-results`. An example invocation looks like this:
.. sourcecode:: python
findings = {
"results": [
[{
"awsAccountId": "111122223333",
"codeVulnerabilityDetails": {"cwes": ["a"], "detectorId": ".."},
}],
# .. other findings as required
],
"account_id": "123456789012", # This is the default - can be omitted
"region": "us-east-1", # This is the default - can be omitted
}
resp = requests.post(
"http://motoapi.amazonaws.com:5000/moto-api/static/inspector2/findings-results",
json=findings,
)
inspector2 = boto3.client("inspector2", region_name="us-east-1")
findings = inspector2.list_findings()["findings"]
- [X] list_members
- [X] list_tags_for_resource
- [ ] list_usage_totals
- [ ] reset_encryption_key
- [ ] search_vulnerabilities
- [X] tag_resource
- [X] untag_resource
- [ ] update_configuration
- [ ] update_ec2_deep_inspection_configuration
- [ ] update_encryption_key
- [ ] update_filter
- [ ] update_org_ec2_deep_inspection_configuration
- [X] update_organization_configuration

View File

@ -100,13 +100,14 @@ mock_firehose = lazy_load(".firehose", "mock_firehose")
mock_forecast = lazy_load(".forecast", "mock_forecast")
mock_greengrass = lazy_load(".greengrass", "mock_greengrass")
mock_glacier = lazy_load(".glacier", "mock_glacier")
mock_glue = lazy_load(".glue", "mock_glue", boto3_name="glue")
mock_glue = lazy_load(".glue", "mock_glue")
mock_guardduty = lazy_load(".guardduty", "mock_guardduty")
mock_iam = lazy_load(".iam", "mock_iam")
mock_identitystore = lazy_load(".identitystore", "mock_identitystore")
mock_inspector2 = lazy_load(".inspector2", "mock_inspector2")
mock_iot = lazy_load(".iot", "mock_iot")
mock_iotdata = lazy_load(".iotdata", "mock_iotdata", boto3_name="iot-data")
mock_ivs = lazy_load(".ivs", "mock_ivs", boto3_name="ivs")
mock_ivs = lazy_load(".ivs", "mock_ivs")
mock_kinesis = lazy_load(".kinesis", "mock_kinesis")
mock_kinesisvideo = lazy_load(".kinesisvideo", "mock_kinesisvideo")
mock_kinesisvideoarchivedmedia = lazy_load(
@ -151,9 +152,7 @@ mock_resourcegroupstaggingapi = lazy_load(
)
mock_robomaker = lazy_load(".robomaker", "mock_robomaker")
mock_route53 = lazy_load(".route53", "mock_route53")
mock_route53resolver = lazy_load(
".route53resolver", "mock_route53resolver", boto3_name="route53resolver"
)
mock_route53resolver = lazy_load(".route53resolver", "mock_route53resolver")
mock_s3 = lazy_load(".s3", "mock_s3")
mock_s3control = lazy_load(".s3control", "mock_s3control")
mock_sagemaker = lazy_load(".sagemaker", "mock_sagemaker")

View File

@ -85,6 +85,7 @@ backend_url_patterns = [
("guardduty", re.compile("https?://guardduty\\.(.+)\\.amazonaws\\.com")),
("iam", re.compile("https?://iam\\.(.*\\.)?amazonaws\\.com")),
("identitystore", re.compile("https?://identitystore\\.(.+)\\.amazonaws\\.com")),
("inspector2", re.compile("https?://inspector2\\.(.+)\\.amazonaws\\.com")),
("iot", re.compile("https?://iot\\.(.+)\\.amazonaws\\.com")),
("iot-data", re.compile("https?://data\\.iot\\.(.+)\\.amazonaws.com")),
("iot-data", re.compile("https?://data-ats\\.iot\\.(.+)\\.amazonaws.com")),

View File

@ -0,0 +1,5 @@
"""inspector2 module initialization; sets value for base decorator."""
from .models import inspector2_backends
from ..core.models import base_decorator
mock_inspector2 = base_decorator(inspector2_backends)

292
moto/inspector2/models.py Normal file
View File

@ -0,0 +1,292 @@
import json
from typing import Any, Dict, List, Optional, Iterable
from moto.core import BaseBackend, BackendDict, BaseModel
from moto.core.utils import unix_time
from moto.moto_api._internal import mock_random
from moto.utilities.tagging_service import TaggingService
class FilterResource(BaseModel):
def __init__(
self,
region: str,
account_id: str,
name: str,
reason: Optional[str],
action: str,
description: Optional[str],
filter_criteria: Dict[str, Any],
backend: "Inspector2Backend",
):
filter_id = mock_random.get_random_hex(10)
self.owner_id = account_id
self.arn = f"arn:aws:inspector2:{region}:{account_id}:owner/{self.owner_id}/filter/{filter_id}"
self.name = name
self.reason = reason
self.action = action
self.description = description
self.filter_criteria = filter_criteria
self.created_at = unix_time()
self.backend = backend
def to_json(self) -> Dict[str, Any]:
return {
"action": self.action,
"arn": self.arn,
"createdAt": self.created_at,
"criteria": self.filter_criteria,
"description": self.description,
"name": self.name,
"ownerId": self.owner_id,
"reason": self.reason,
"tags": self.backend.list_tags_for_resource(self.arn),
}
class AccountStatus(BaseModel):
def __init__(self, account_id: str):
self.account_id = account_id
self.ec2 = "DISABLED"
self.ecr = "DISABLED"
self._lambda = "DISABLED"
self.lambda_code = "DISABLED"
def toggle(self, resource_types: List[str], enable: bool) -> None:
if "EC2" in resource_types:
self.ec2 = "ENABLED" if enable else "DISABLED"
if "ECR" in resource_types:
self.ecr = "ENABLED" if enable else "DISABLED"
if "LAMBDA" in resource_types:
self._lambda = "ENABLED" if enable else "DISABLED"
if "LAMBDA_CODE" in resource_types or "LAMBDACODE" in resource_types:
self.lambda_code = "ENABLED" if enable else "DISABLED"
def to_json(self) -> Dict[str, Any]:
return {
"accountId": self.account_id,
"resourceStatus": {
"ec2": self.ec2,
"ecr": self.ecr,
"lambda": self._lambda,
"lambdaCode": self.lambda_code,
},
"status": self._status(),
}
def _status(self) -> str:
return (
"ENABLED"
if "ENABLED" in [self.ec2, self.ecr, self._lambda, self.lambda_code]
else "DISABLED"
)
def to_batch_json(self) -> Dict[str, Any]:
return {
"accountId": self.account_id,
"resourceState": {
"ec2": {"status": self.ec2},
"ecr": {"status": self.ecr},
"lambda": {"status": self._lambda},
"lambdaCode": {"status": self.lambda_code},
},
"state": {"status": self._status()},
}
class Member(BaseModel):
def __init__(self, account_id: str, admin_account_id: str):
self.account_id = account_id
self.admin_account_id = admin_account_id
self.status = "ENABLED"
self.updated_at = unix_time()
def to_json(self) -> Dict[str, Any]:
return {
"accountId": self.account_id,
"delegatedAdminAccountId": self.admin_account_id,
"relationshipStatus": self.status,
"updatedAt": self.updated_at,
}
class Inspector2Backend(BaseBackend):
def __init__(self, region_name: str, account_id: str):
super().__init__(region_name, account_id)
self.filters: Dict[str, FilterResource] = dict()
self.admin_accounts: Dict[str, str] = dict()
self.account_status: Dict[str, AccountStatus] = dict()
self.members: Dict[str, Member] = dict()
self.org_config = {
"ec2": False,
"ecr": False,
"lambda": False,
"lambdaCode": False,
}
self.tagger = TaggingService()
self.findings_queue: List[Any] = []
self.findings: Dict[str, Any] = {}
def create_filter(
self,
action: str,
description: str,
filter_criteria: Dict[str, Any],
name: str,
reason: str,
tags: Dict[str, str],
) -> str:
_filter = FilterResource(
region=self.region_name,
account_id=self.account_id,
action=action,
description=description,
filter_criteria=filter_criteria,
name=name,
reason=reason,
backend=self,
)
self.filters[_filter.arn] = _filter
self.tag_resource(_filter.arn, tags)
return _filter.arn
def delete_filter(self, arn: str) -> None:
self.filters.pop(arn, None)
def list_filters(self, action: str, arns: List[str]) -> Iterable[FilterResource]:
"""
Pagination is not yet implemented
"""
return [
f
for f in self.filters.values()
if (arns and f.arn in arns)
or (action and f.action == action)
or (not arns and not action)
]
def list_findings(
self,
filter_criteria: List[Dict[str, Any]],
max_results: str,
next_token: str,
sort_criteria: str,
) -> List[Dict[str, Any]]:
"""
This call will always return 0 findings by default.
You can use a dedicated API to override this, by configuring a queue of expected results.
A request to `list_findings` will take the first result from that queue, and assign it to the provided arguments. Subsequent calls using the same arguments will return the same result. Other requests using a different SQL-query will take the next result from the queue, or return an empty result if the queue is empty.
Configure this queue by making an HTTP request to `/moto-api/static/inspector2/findings-results`. An example invocation looks like this:
.. sourcecode:: python
findings = {
"results": [
[{
"awsAccountId": "111122223333",
"codeVulnerabilityDetails": {"cwes": ["a"], "detectorId": ".."},
}],
# .. other findings as required
],
"account_id": "123456789012", # This is the default - can be omitted
"region": "us-east-1", # This is the default - can be omitted
}
resp = requests.post(
"http://motoapi.amazonaws.com:5000/moto-api/static/inspector2/findings-results",
json=findings,
)
inspector2 = boto3.client("inspector2", region_name="us-east-1")
findings = inspector2.list_findings()["findings"]
"""
key = f"{json.dumps(filter_criteria)}--{max_results}--{next_token}--{sort_criteria}"
if key not in self.findings and self.findings_queue:
self.findings[key] = self.findings_queue.pop(0)
if key in self.findings:
return self.findings[key]
else:
return []
def list_delegated_admin_accounts(self) -> Dict[str, str]:
return self.admin_accounts
def enable_delegated_admin_account(self, account_id: str) -> None:
self.admin_accounts[account_id] = "ENABLED"
def disable_delegated_admin_account(self, account_id: str) -> None:
self.admin_accounts[account_id] = "DISABLED"
def describe_organization_configuration(self) -> Dict[str, Any]:
return {"autoEnable": self.org_config, "maxAccountLimitReached": False}
def update_organization_configuration(
self, auto_enable: Dict[str, bool]
) -> Dict[str, Any]:
self.org_config.update(auto_enable)
return {"autoEnable": self.org_config}
def disable(
self, account_ids: List[str], resource_types: List[str]
) -> List[Dict[str, Any]]:
for acct in account_ids:
if acct not in self.account_status:
self.account_status[acct] = AccountStatus(acct)
self.account_status[acct].toggle(resource_types, enable=False)
return [
status.to_json()
for a_id, status in self.account_status.items()
if a_id in account_ids
]
def enable(
self, account_ids: List[str], resource_types: List[str]
) -> List[Dict[str, Any]]:
for acct in account_ids:
if acct not in self.account_status:
self.account_status[acct] = AccountStatus(acct)
self.account_status[acct].toggle(resource_types, enable=True)
return [
status.to_json()
for a_id, status in self.account_status.items()
if a_id in account_ids
]
def batch_get_account_status(self, account_ids: List[str]) -> List[Dict[str, Any]]:
return [
status.to_batch_json()
for a_id, status in self.account_status.items()
if a_id in account_ids
]
def list_members(self) -> Iterable[Member]:
return self.members.values()
def associate_member(self, account_id: str) -> None:
self.members[account_id] = Member(
account_id=account_id, admin_account_id=self.account_id
)
def disassociate_member(self, account_id: str) -> None:
self.members[account_id].status = "DISABLED"
def get_member(self, account_id: str) -> Member:
return self.members[account_id]
def tag_resource(self, resource_arn: str, tags: Dict[str, str]) -> None:
self.tagger.tag_resource(
resource_arn, TaggingService.convert_dict_to_tags_input(tags)
)
def list_tags_for_resource(self, resource_arn: str) -> Dict[str, str]:
return self.tagger.get_tag_dict_for_resource(resource_arn)
def untag_resource(self, arn: str, tag_keys: List[str]) -> None:
self.tagger.untag_resource_using_names(arn, tag_keys)
inspector2_backends = BackendDict(Inspector2Backend, "inspector2")

View File

@ -0,0 +1,151 @@
import json
from typing import Any, Dict, List
from urllib.parse import unquote
from moto.core.common_types import TYPE_RESPONSE
from moto.core.responses import BaseResponse
from .models import inspector2_backends, Inspector2Backend
class Inspector2Response(BaseResponse):
def __init__(self) -> None:
super().__init__(service_name="inspector2")
@property
def inspector2_backend(self) -> Inspector2Backend:
return inspector2_backends[self.current_account][self.region]
def create_filter(self) -> str:
action = self._get_param("action")
description = self._get_param("description")
filter_criteria = self._get_param("filterCriteria")
name = self._get_param("name")
reason = self._get_param("reason")
tags = self._get_param("tags")
arn = self.inspector2_backend.create_filter(
action=action,
description=description,
filter_criteria=filter_criteria,
name=name,
reason=reason,
tags=tags,
)
return json.dumps(dict(arn=arn))
def delete_filter(self) -> str:
arn = self._get_param("arn")
self.inspector2_backend.delete_filter(arn=arn)
return json.dumps(dict(arn=arn))
def list_filters(self) -> str:
action = self._get_param("action")
arns = self._get_param("arns")
filters = self.inspector2_backend.list_filters(action=action, arns=arns)
return json.dumps({"filters": [f.to_json() for f in filters]})
def list_findings(self) -> str:
filter_criteria = self._get_param("filterCriteria")
max_results = self._get_param("maxResults")
next_token = self._get_param("nextToken")
sort_criteria = self._get_param("sortCriteria")
findings = self.inspector2_backend.list_findings(
filter_criteria=filter_criteria,
max_results=max_results,
next_token=next_token,
sort_criteria=sort_criteria,
)
return json.dumps(dict(findings=findings))
def list_delegated_admin_accounts(self) -> str:
accounts = self.inspector2_backend.list_delegated_admin_accounts()
return json.dumps(
{
"delegatedAdminAccounts": [
{"accountId": key, "status": val} for key, val in accounts.items()
]
}
)
def enable_delegated_admin_account(self) -> str:
account_id = self._get_param("delegatedAdminAccountId")
self.inspector2_backend.enable_delegated_admin_account(account_id)
return json.dumps({"delegatedAdminAccountId": account_id})
def disable_delegated_admin_account(self) -> str:
account_id = self._get_param("delegatedAdminAccountId")
self.inspector2_backend.disable_delegated_admin_account(account_id)
return json.dumps({"delegatedAdminAccountId": account_id})
def describe_organization_configuration(self) -> str:
config = self.inspector2_backend.describe_organization_configuration()
return json.dumps(config)
def update_organization_configuration(self) -> str:
auto_enable = self._get_param("autoEnable")
config = self.inspector2_backend.update_organization_configuration(auto_enable)
return json.dumps(config)
def enable(self) -> str:
account_ids = self._get_param("accountIds")
resource_types = self._get_param("resourceTypes")
accounts = self.inspector2_backend.enable(account_ids, resource_types)
failed: List[Dict[str, Any]] = []
return json.dumps({"accounts": accounts, "failedAccounts": failed})
def disable(self) -> str:
account_ids = self._get_param("accountIds")
resource_types = self._get_param("resourceTypes")
accounts = self.inspector2_backend.disable(account_ids, resource_types)
failed: List[Dict[str, Any]] = []
return json.dumps({"accounts": accounts, "failedAccounts": failed})
def batch_get_account_status(self) -> str:
account_ids = self._get_param("accountIds")
accounts = self.inspector2_backend.batch_get_account_status(account_ids)
failed: List[Dict[str, Any]] = []
return json.dumps({"accounts": accounts, "failedAccounts": failed})
def list_members(self) -> str:
members = self.inspector2_backend.list_members()
return json.dumps({"members": [m.to_json() for m in members]})
def associate_member(self) -> str:
account_id = self._get_param("accountId")
self.inspector2_backend.associate_member(account_id)
return json.dumps({"accountId": account_id})
def disassociate_member(self) -> str:
account_id = self._get_param("accountId")
self.inspector2_backend.disassociate_member(account_id)
return json.dumps({"accountId": account_id})
def get_member(self) -> str:
account_id = self._get_param("accountId")
member = self.inspector2_backend.get_member(account_id)
return json.dumps({"member": member.to_json()})
def list_tags_for_resource(self) -> TYPE_RESPONSE:
arn = unquote(self.path.split("/tags/")[-1])
tags = self.inspector2_backend.list_tags_for_resource(arn)
return 200, {}, json.dumps({"tags": tags})
def tag_resource(self) -> TYPE_RESPONSE:
resource_arn = unquote(self.path.split("/tags/")[-1])
tags = self._get_param("tags")
self.inspector2_backend.tag_resource(resource_arn=resource_arn, tags=tags)
return 200, {}, "{}"
def untag_resource(self) -> TYPE_RESPONSE:
resource_arn = unquote(self.path.split("/tags/")[-1])
tag_keys = self.querystring.get("tagKeys")
self.inspector2_backend.untag_resource(resource_arn, tag_keys) # type: ignore
return 200, {}, "{}"
def tags(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore[return]
self.setup_class(request, full_url, headers)
if request.method == "GET":
return self.list_tags_for_resource()
if request.method == "POST":
return self.tag_resource()
if request.method == "DELETE":
return self.untag_resource()

29
moto/inspector2/urls.py Normal file
View File

@ -0,0 +1,29 @@
"""inspector2 base URL and path."""
from .responses import Inspector2Response
url_bases = [
r"https?://inspector2\.(.+)\.amazonaws\.com",
]
url_paths = {
"{0}/delegatedadminaccounts/disable$": Inspector2Response.dispatch,
"{0}/delegatedadminaccounts/enable$": Inspector2Response.dispatch,
"{0}/delegatedadminaccounts/list$": Inspector2Response.dispatch,
"{0}/disable$": Inspector2Response.dispatch,
"{0}/enable$": Inspector2Response.dispatch,
"{0}/filters/create$": Inspector2Response.dispatch,
"{0}/filters/delete$": Inspector2Response.dispatch,
"{0}/filters/list$": Inspector2Response.dispatch,
"{0}/findings/list$": Inspector2Response.dispatch,
"{0}/members/associate$": Inspector2Response.dispatch,
"{0}/members/get": Inspector2Response.dispatch,
"{0}/members/list": Inspector2Response.dispatch,
"{0}/members/disassociate$": Inspector2Response.dispatch,
"{0}/status/batch/get$": Inspector2Response.dispatch,
"{0}/organizationconfiguration/describe$": Inspector2Response.dispatch,
"{0}/organizationconfiguration/update$": Inspector2Response.dispatch,
"{0}/tags/(?P<resource_arn>.+)$": Inspector2Response.method_dispatch(
Inspector2Response.tags # type: ignore
),
}

View File

@ -6,15 +6,11 @@ url_bases = [
r"https?://ivs\.(.+)\.amazonaws\.com",
]
response = IVSResponse()
url_paths = {
"{0}/CreateChannel": response.dispatch,
"{0}/ListChannels": response.dispatch,
"{0}/GetChannel": response.dispatch,
"{0}/BatchGetChannel": response.dispatch,
"{0}/UpdateChannel": response.dispatch,
"{0}/DeleteChannel": response.dispatch,
"{0}/CreateChannel": IVSResponse.dispatch,
"{0}/ListChannels": IVSResponse.dispatch,
"{0}/GetChannel": IVSResponse.dispatch,
"{0}/BatchGetChannel": IVSResponse.dispatch,
"{0}/UpdateChannel": IVSResponse.dispatch,
"{0}/DeleteChannel": IVSResponse.dispatch,
}

View File

@ -83,5 +83,16 @@ class MotoAPIBackend(BaseBackend):
)
)
def set_inspector2_findings_result(
self,
results: Optional[List[List[Dict[str, Any]]]],
account_id: str,
region: str,
) -> None:
from moto.inspector2.models import inspector2_backends
backend = inspector2_backends[account_id][region]
backend.findings_queue.append(results)
moto_api_backend = MotoAPIBackend(region_name="global", account_id=DEFAULT_ACCOUNT_ID)

View File

@ -227,3 +227,25 @@ class MotoAPIResponse(BaseResponse):
region=region,
)
return 201, {}, ""
def set_inspector2_findings_result(
self,
request: Any,
full_url: str, # pylint: disable=unused-argument
headers: Any,
) -> TYPE_RESPONSE:
from .models import moto_api_backend
request_body_size = int(headers["Content-Length"])
body = request.environ["wsgi.input"].read(request_body_size).decode("utf-8")
body = json.loads(body)
account_id = body.get("account_id", DEFAULT_ACCOUNT_ID)
region = body.get("region", "us-east-1")
for result in body.get("results", []):
moto_api_backend.set_inspector2_findings_result(
results=result,
account_id=account_id,
region=region,
)
return 201, {}, ""

View File

@ -13,6 +13,7 @@ url_paths = {
"{0}/moto-api/reset-auth": response_instance.reset_auth_response,
"{0}/moto-api/seed": response_instance.seed,
"{0}/moto-api/static/athena/query-results": response_instance.set_athena_result,
"{0}/moto-api/static/inspector2/findings-results": response_instance.set_inspector2_findings_result,
"{0}/moto-api/static/sagemaker/endpoint-results": response_instance.set_sagemaker_result,
"{0}/moto-api/static/rds-data/statement-results": response_instance.set_rds_data_result,
"{0}/moto-api/state-manager/get-transition": response_instance.get_transition,

View File

View File

@ -0,0 +1,60 @@
import boto3
from moto import mock_inspector2
# See our Development Tips on writing tests for hints on how to write good tests:
# http://docs.getmoto.org/en/latest/docs/contributing/development_tips/tests.html
@mock_inspector2
def test_create_filter():
client = boto3.client("inspector2", region_name="us-east-2")
resp = client.create_filter(
name="my_first_filter",
reason="because I said so",
action="NONE",
description="my filter",
filterCriteria={
"codeVulnerabilityDetectorName": [{"comparison": "EQUALS", "value": "cvdn"}]
},
)
assert "arn" in resp
@mock_inspector2
def test_list_filters():
client = boto3.client("inspector2", region_name="ap-southeast-1")
assert client.list_filters()["filters"] == []
arn1 = client.create_filter(
name="my_first_filter",
action="NONE",
filterCriteria={"findingArn": [{"comparison": "EQUALS", "value": "cvdn"}]},
)["arn"]
filters = client.list_filters()["filters"]
assert len(filters) == 1
assert filters[0]["arn"] == arn1
arn2 = client.create_filter(
name="my_second_filter",
action="SUPPRESS",
filterCriteria={"fixAvailable": [{"comparison": "EQUALS", "value": "cvdn"}]},
)["arn"]
filters = client.list_filters()["filters"]
assert len(filters) == 2
filters = client.list_filters(action="SUPPRESS")["filters"]
assert len(filters) == 1
assert filters[0]["arn"] == arn2
filters = client.list_filters(arns=[arn1])["filters"]
assert len(filters) == 1
assert filters[0]["arn"] == arn1
client.delete_filter(arn=arn1)
filters = client.list_filters()["filters"]
assert len(filters) == 1
assert filters[0]["arn"] == arn2

View File

@ -0,0 +1,29 @@
import boto3
from moto import mock_inspector2
# See our Development Tips on writing tests for hints on how to write good tests:
# http://docs.getmoto.org/en/latest/docs/contributing/development_tips/tests.html
@mock_inspector2
def test_deleted_accounts():
client = boto3.client("inspector2", region_name="us-east-1")
assert client.list_delegated_admin_accounts()["delegatedAdminAccounts"] == []
resp = client.enable_delegated_admin_account(delegatedAdminAccountId="111111111111")
assert resp["delegatedAdminAccountId"] == "111111111111"
assert client.list_delegated_admin_accounts()["delegatedAdminAccounts"] == [
{"accountId": "111111111111", "status": "ENABLED"}
]
resp = client.disable_delegated_admin_account(
delegatedAdminAccountId="111111111111"
)
assert resp["delegatedAdminAccountId"] == "111111111111"
assert client.list_delegated_admin_accounts()["delegatedAdminAccounts"] == [
{"accountId": "111111111111", "status": "DISABLED"}
]

View File

@ -0,0 +1,63 @@
import boto3
from moto import mock_inspector2
from tests import DEFAULT_ACCOUNT_ID
# See our Development Tips on writing tests for hints on how to write good tests:
# http://docs.getmoto.org/en/latest/docs/contributing/development_tips/tests.html
@mock_inspector2
def test_organization_configuration():
client = boto3.client("inspector2", region_name="us-west-1")
resp = client.enable(accountIds=[DEFAULT_ACCOUNT_ID], resourceTypes=["EC2", "ECR"])
assert resp["accounts"] == [
{
"accountId": DEFAULT_ACCOUNT_ID,
"resourceStatus": {
"ec2": "ENABLED",
"ecr": "ENABLED",
"lambda": "DISABLED",
"lambdaCode": "DISABLED",
},
"status": "ENABLED",
}
]
assert resp["failedAccounts"] == []
resp = client.batch_get_account_status(accountIds=[DEFAULT_ACCOUNT_ID])
assert resp["accounts"] == [
{
"accountId": "123456789012",
"resourceState": {
"ec2": {"status": "ENABLED"},
"ecr": {"status": "ENABLED"},
"lambda": {"status": "DISABLED"},
"lambdaCode": {"status": "DISABLED"},
},
"state": {"status": "ENABLED"},
}
]
assert resp["failedAccounts"] == []
resp = client.disable(
accountIds=[DEFAULT_ACCOUNT_ID], resourceTypes=["LAMBDA", "ECR"]
)
assert resp["accounts"] == [
{
"accountId": DEFAULT_ACCOUNT_ID,
"resourceStatus": {
"ec2": "ENABLED",
"ecr": "DISABLED",
"lambda": "DISABLED",
"lambdaCode": "DISABLED",
},
"status": "ENABLED",
}
]
client.disable(accountIds=[DEFAULT_ACCOUNT_ID], resourceTypes=["EC2"])
resp = client.batch_get_account_status(accountIds=[DEFAULT_ACCOUNT_ID])
assert resp["accounts"][0]["state"] == {"status": "DISABLED"}

View File

@ -0,0 +1,59 @@
import boto3
import requests
from moto import mock_inspector2, settings
@mock_inspector2
def test_set_findings():
base_url = (
"localhost:5000" if settings.TEST_SERVER_MODE else "motoapi.amazonaws.com"
)
findings = {
"results": [
[
{
"awsAccountId": "111122223333",
"codeVulnerabilityDetails": {"cwes": ["a"], "detectorId": ".."},
}
]
],
"region": "us-west-1",
}
resp = requests.post(
f"http://{base_url}/moto-api/static/inspector2/findings-results",
json=findings,
)
assert resp.status_code == 201
inspector2 = boto3.client("inspector2", region_name="us-west-1")
assert inspector2.list_findings()["findings"] == [
{
"awsAccountId": "111122223333",
"codeVulnerabilityDetails": {
"cwes": ["a"],
"detectorId": "..",
},
}
]
# Calling list_findings with different arguments returns an empty list
assert (
inspector2.list_findings(
filterCriteria={"awsAccountId": [{"comparison": "EQUALS", "value": "x"}]}
)["findings"]
== []
)
# Calling list_findings with original arguments returns original list
assert inspector2.list_findings()["findings"] == [
{
"awsAccountId": "111122223333",
"codeVulnerabilityDetails": {
"cwes": ["a"],
"detectorId": "..",
},
}
]

View File

@ -0,0 +1,22 @@
import boto3
from moto import mock_inspector2
from tests import DEFAULT_ACCOUNT_ID
# See our Development Tips on writing tests for hints on how to write good tests:
# http://docs.getmoto.org/en/latest/docs/contributing/development_tips/tests.html
@mock_inspector2
def test_members():
client = boto3.client("inspector2", region_name="us-east-1")
assert client.list_members()["members"] == []
resp = client.associate_member(accountId="111111111111")
assert resp["accountId"] == "111111111111"
resp = client.get_member(accountId="111111111111")["member"]
assert resp["accountId"] == "111111111111"
assert resp["delegatedAdminAccountId"] == DEFAULT_ACCOUNT_ID
assert resp["relationshipStatus"] == "ENABLED"

View File

@ -0,0 +1,45 @@
import boto3
from moto import mock_inspector2
# See our Development Tips on writing tests for hints on how to write good tests:
# http://docs.getmoto.org/en/latest/docs/contributing/development_tips/tests.html
@mock_inspector2
def test_organization_configuration():
client = boto3.client("inspector2", region_name="us-west-1")
resp = client.describe_organization_configuration()
assert resp["autoEnable"] == {
"ec2": False,
"ecr": False,
"lambda": False,
"lambdaCode": False,
}
assert resp["maxAccountLimitReached"] is False
resp = client.update_organization_configuration(
autoEnable={
"ec2": True,
"ecr": False,
"lambda": True,
"lambdaCode": False,
}
)
assert resp["autoEnable"] == {
"ec2": True,
"ecr": False,
"lambda": True,
"lambdaCode": False,
}
resp = client.describe_organization_configuration()
assert resp["autoEnable"] == {
"ec2": True,
"ecr": False,
"lambda": True,
"lambdaCode": False,
}

View File

@ -0,0 +1,49 @@
import boto3
from moto import mock_inspector2
@mock_inspector2
def test_tag_resource():
client = boto3.client("inspector2", region_name="ap-southeast-1")
arn = client.create_filter(
name="my_first_filter",
action="NONE",
filterCriteria={"findingArn": [{"comparison": "EQUALS", "value": "cvdn"}]},
)["arn"]
assert client.list_tags_for_resource(resourceArn=arn)["tags"] == {}
client.tag_resource(resourceArn=arn, tags={"k1": "v1"})
assert client.list_tags_for_resource(resourceArn=arn)["tags"] == {"k1": "v1"}
client.tag_resource(resourceArn=arn, tags={"k2": "v2"})
assert client.list_tags_for_resource(resourceArn=arn)["tags"] == {
"k1": "v1",
"k2": "v2",
}
client.untag_resource(resourceArn=arn, tagKeys=["k1"])
assert client.list_tags_for_resource(resourceArn=arn)["tags"] == {"k2": "v2"}
@mock_inspector2
def test_tag_filter():
client = boto3.client("inspector2", region_name="ap-southeast-1")
arn = client.create_filter(
name="my_first_filter",
action="NONE",
filterCriteria={"findingArn": [{"comparison": "EQUALS", "value": "cvdn"}]},
tags={"k1": "v1"},
)["arn"]
assert client.list_tags_for_resource(resourceArn=arn)["tags"] == {"k1": "v1"}
client.tag_resource(resourceArn=arn, tags={"k2": "v2"})
assert client.list_tags_for_resource(resourceArn=arn)["tags"] == {
"k1": "v1",
"k2": "v2",
}
filters = client.list_filters()["filters"]
assert filters[0]["tags"] == {"k1": "v1", "k2": "v2"}