GuardDuty - additional methods (#5175)
This commit is contained in:
parent
76a094b6fb
commit
adeaea7c70
@ -2852,20 +2852,20 @@
|
||||
|
||||
## guardduty
|
||||
<details>
|
||||
<summary>3% implemented</summary>
|
||||
<summary>18% implemented</summary>
|
||||
|
||||
- [ ] accept_invitation
|
||||
- [ ] archive_findings
|
||||
- [X] create_detector
|
||||
- [ ] create_filter
|
||||
- [X] create_filter
|
||||
- [ ] create_ip_set
|
||||
- [ ] create_members
|
||||
- [ ] create_publishing_destination
|
||||
- [ ] create_sample_findings
|
||||
- [ ] create_threat_intel_set
|
||||
- [ ] decline_invitations
|
||||
- [ ] delete_detector
|
||||
- [ ] delete_filter
|
||||
- [X] delete_detector
|
||||
- [X] delete_filter
|
||||
- [ ] delete_invitations
|
||||
- [ ] delete_ip_set
|
||||
- [ ] delete_members
|
||||
@ -2876,9 +2876,9 @@
|
||||
- [ ] disable_organization_admin_account
|
||||
- [ ] disassociate_from_master_account
|
||||
- [ ] disassociate_members
|
||||
- [ ] enable_organization_admin_account
|
||||
- [ ] get_detector
|
||||
- [ ] get_filter
|
||||
- [X] enable_organization_admin_account
|
||||
- [X] get_detector
|
||||
- [X] get_filter
|
||||
- [ ] get_findings
|
||||
- [ ] get_findings_statistics
|
||||
- [ ] get_invitations_count
|
||||
@ -2895,7 +2895,7 @@
|
||||
- [ ] list_invitations
|
||||
- [ ] list_ip_sets
|
||||
- [ ] list_members
|
||||
- [ ] list_organization_admin_accounts
|
||||
- [X] list_organization_admin_accounts
|
||||
- [ ] list_publishing_destinations
|
||||
- [ ] list_tags_for_resource
|
||||
- [ ] list_threat_intel_sets
|
||||
@ -2904,8 +2904,8 @@
|
||||
- [ ] tag_resource
|
||||
- [ ] unarchive_findings
|
||||
- [ ] untag_resource
|
||||
- [ ] update_detector
|
||||
- [ ] update_filter
|
||||
- [X] update_detector
|
||||
- [X] update_filter
|
||||
- [ ] update_findings_feedback
|
||||
- [ ] update_ip_set
|
||||
- [ ] update_member_detectors
|
||||
|
@ -28,15 +28,15 @@ guardduty
|
||||
- [ ] accept_invitation
|
||||
- [ ] archive_findings
|
||||
- [X] create_detector
|
||||
- [ ] create_filter
|
||||
- [X] create_filter
|
||||
- [ ] create_ip_set
|
||||
- [ ] create_members
|
||||
- [ ] create_publishing_destination
|
||||
- [ ] create_sample_findings
|
||||
- [ ] create_threat_intel_set
|
||||
- [ ] decline_invitations
|
||||
- [ ] delete_detector
|
||||
- [ ] delete_filter
|
||||
- [X] delete_detector
|
||||
- [X] delete_filter
|
||||
- [ ] delete_invitations
|
||||
- [ ] delete_ip_set
|
||||
- [ ] delete_members
|
||||
@ -47,9 +47,9 @@ guardduty
|
||||
- [ ] disable_organization_admin_account
|
||||
- [ ] disassociate_from_master_account
|
||||
- [ ] disassociate_members
|
||||
- [ ] enable_organization_admin_account
|
||||
- [ ] get_detector
|
||||
- [ ] get_filter
|
||||
- [X] enable_organization_admin_account
|
||||
- [X] get_detector
|
||||
- [X] get_filter
|
||||
- [ ] get_findings
|
||||
- [ ] get_findings_statistics
|
||||
- [ ] get_invitations_count
|
||||
@ -70,7 +70,11 @@ guardduty
|
||||
- [ ] list_invitations
|
||||
- [ ] list_ip_sets
|
||||
- [ ] list_members
|
||||
- [ ] list_organization_admin_accounts
|
||||
- [X] list_organization_admin_accounts
|
||||
|
||||
Pagination is not yet implemented
|
||||
|
||||
|
||||
- [ ] list_publishing_destinations
|
||||
- [ ] list_tags_for_resource
|
||||
- [ ] list_threat_intel_sets
|
||||
@ -79,8 +83,8 @@ guardduty
|
||||
- [ ] tag_resource
|
||||
- [ ] unarchive_findings
|
||||
- [ ] untag_resource
|
||||
- [ ] update_detector
|
||||
- [ ] update_filter
|
||||
- [X] update_detector
|
||||
- [X] update_filter
|
||||
- [ ] update_findings_feedback
|
||||
- [ ] update_ip_set
|
||||
- [ ] update_member_detectors
|
||||
|
31
moto/guardduty/exceptions.py
Normal file
31
moto/guardduty/exceptions.py
Normal file
@ -0,0 +1,31 @@
|
||||
from moto.core.exceptions import JsonRESTError
|
||||
|
||||
|
||||
class GuardDutyException(JsonRESTError):
|
||||
pass
|
||||
|
||||
|
||||
class DetectorNotFoundException(GuardDutyException):
|
||||
code = 400
|
||||
|
||||
def __init__(self):
|
||||
super().__init__(
|
||||
"InvalidInputException",
|
||||
"The request is rejected because the input detectorId is not owned by the current account.",
|
||||
)
|
||||
|
||||
def get_headers(self, *args, **kwargs): # pylint: disable=unused-argument
|
||||
return {"X-Amzn-ErrorType": "BadRequestException"}
|
||||
|
||||
|
||||
class FilterNotFoundException(GuardDutyException):
|
||||
code = 400
|
||||
|
||||
def __init__(self):
|
||||
super().__init__(
|
||||
"InvalidInputException",
|
||||
"The request is rejected since no such resource found.",
|
||||
)
|
||||
|
||||
def get_headers(self, *args, **kwargs): # pylint: disable=unused-argument
|
||||
return {"X-Amzn-ErrorType": "BadRequestException"}
|
@ -1,14 +1,16 @@
|
||||
from __future__ import unicode_literals
|
||||
from moto.core import BaseBackend, BaseModel
|
||||
from moto.core.utils import BackendDict
|
||||
from moto.core import BaseBackend, BaseModel, get_account_id
|
||||
from moto.core.utils import BackendDict, get_random_hex
|
||||
from datetime import datetime
|
||||
from uuid import uuid4
|
||||
|
||||
from .exceptions import DetectorNotFoundException, FilterNotFoundException
|
||||
|
||||
|
||||
class GuardDutyBackend(BaseBackend):
|
||||
def __init__(self, region_name=None):
|
||||
super().__init__()
|
||||
self.region_name = region_name
|
||||
self.admin_account_ids = []
|
||||
self.detectors = {}
|
||||
|
||||
def reset(self):
|
||||
@ -24,19 +26,39 @@ class GuardDutyBackend(BaseBackend):
|
||||
]:
|
||||
finding_publishing_frequency = "SIX_HOURS"
|
||||
|
||||
service_role = "AWSServiceRoleForAmazonGuardDuty"
|
||||
detector = Detector(
|
||||
self,
|
||||
datetime.now,
|
||||
finding_publishing_frequency,
|
||||
service_role,
|
||||
enable,
|
||||
data_sources,
|
||||
tags,
|
||||
created_at=datetime.now(),
|
||||
finding_publish_freq=finding_publishing_frequency,
|
||||
enabled=enable,
|
||||
datasources=data_sources,
|
||||
tags=tags,
|
||||
)
|
||||
self.detectors[detector.id] = detector
|
||||
return detector.id
|
||||
|
||||
def create_filter(
|
||||
self, detector_id, name, action, description, finding_criteria, rank
|
||||
):
|
||||
detector = self.get_detector(detector_id)
|
||||
_filter = Filter(name, action, description, finding_criteria, rank)
|
||||
detector.add_filter(_filter)
|
||||
|
||||
def delete_detector(self, detector_id):
|
||||
self.detectors.pop(detector_id, None)
|
||||
|
||||
def delete_filter(self, detector_id, filter_name):
|
||||
detector = self.get_detector(detector_id)
|
||||
detector.delete_filter(filter_name)
|
||||
|
||||
def enable_organization_admin_account(self, admin_account_id):
|
||||
self.admin_account_ids.append(admin_account_id)
|
||||
|
||||
def list_organization_admin_accounts(self):
|
||||
"""
|
||||
Pagination is not yet implemented
|
||||
"""
|
||||
return self.admin_account_ids
|
||||
|
||||
def list_detectors(self):
|
||||
"""
|
||||
The MaxResults and NextToken-parameter have not yet been implemented.
|
||||
@ -46,26 +68,139 @@ class GuardDutyBackend(BaseBackend):
|
||||
detectorids.append(self.detectors[detector].id)
|
||||
return detectorids
|
||||
|
||||
def get_detector(self, detector_id):
|
||||
if detector_id not in self.detectors:
|
||||
raise DetectorNotFoundException
|
||||
return self.detectors[detector_id]
|
||||
|
||||
def get_filter(self, detector_id, filter_name):
|
||||
detector = self.get_detector(detector_id)
|
||||
return detector.get_filter(filter_name)
|
||||
|
||||
def update_detector(
|
||||
self, detector_id, enable, finding_publishing_frequency, data_sources
|
||||
):
|
||||
detector = self.get_detector(detector_id)
|
||||
detector.update(enable, finding_publishing_frequency, data_sources)
|
||||
|
||||
def update_filter(
|
||||
self, detector_id, filter_name, action, description, finding_criteria, rank
|
||||
):
|
||||
detector = self.get_detector(detector_id)
|
||||
detector.update_filter(
|
||||
filter_name,
|
||||
action=action,
|
||||
description=description,
|
||||
finding_criteria=finding_criteria,
|
||||
rank=rank,
|
||||
)
|
||||
|
||||
|
||||
class Filter(BaseModel):
|
||||
def __init__(self, name, action, description, finding_criteria, rank):
|
||||
self.name = name
|
||||
self.action = action
|
||||
self.description = description
|
||||
self.finding_criteria = finding_criteria
|
||||
self.rank = rank or 1
|
||||
|
||||
def update(self, action, description, finding_criteria, rank):
|
||||
if action is not None:
|
||||
self.action = action
|
||||
if description is not None:
|
||||
self.description = description
|
||||
if finding_criteria is not None:
|
||||
self.finding_criteria = finding_criteria
|
||||
if rank is not None:
|
||||
self.rank = rank
|
||||
|
||||
def to_json(self):
|
||||
return {
|
||||
"name": self.name,
|
||||
"action": self.action,
|
||||
"description": self.description,
|
||||
"findingCriteria": self.finding_criteria,
|
||||
"rank": self.rank,
|
||||
}
|
||||
|
||||
|
||||
class Detector(BaseModel):
|
||||
def __init__(
|
||||
self,
|
||||
created_at,
|
||||
finding_publish_freq,
|
||||
service_role,
|
||||
status,
|
||||
updated_at,
|
||||
enabled,
|
||||
datasources,
|
||||
tags,
|
||||
):
|
||||
self.id = str(uuid4())
|
||||
self.id = get_random_hex(length=32)
|
||||
self.created_at = created_at
|
||||
self.finding_publish_freq = finding_publish_freq
|
||||
self.service_role = service_role
|
||||
self.status = status
|
||||
self.updated_at = updated_at
|
||||
self.datasources = datasources
|
||||
self.tags = tags
|
||||
self.service_role = f"arn:aws:iam::{get_account_id()}:role/aws-service-role/guardduty.amazonaws.com/AWSServiceRoleForAmazonGuardDuty"
|
||||
self.enabled = enabled
|
||||
self.updated_at = created_at
|
||||
self.datasources = datasources or {}
|
||||
self.tags = tags or {}
|
||||
|
||||
self.filters = dict()
|
||||
|
||||
def add_filter(self, _filter: Filter):
|
||||
self.filters[_filter.name] = _filter
|
||||
|
||||
def delete_filter(self, filter_name):
|
||||
self.filters.pop(filter_name, None)
|
||||
|
||||
def get_filter(self, filter_name: str):
|
||||
if filter_name not in self.filters:
|
||||
raise FilterNotFoundException
|
||||
return self.filters[filter_name]
|
||||
|
||||
def update_filter(self, filter_name, action, description, finding_criteria, rank):
|
||||
_filter = self.get_filter(filter_name)
|
||||
_filter.update(
|
||||
action=action,
|
||||
description=description,
|
||||
finding_criteria=finding_criteria,
|
||||
rank=rank,
|
||||
)
|
||||
|
||||
def update(self, enable, finding_publishing_frequency, data_sources):
|
||||
if enable is not None:
|
||||
self.enabled = enable
|
||||
if finding_publishing_frequency is not None:
|
||||
self.finding_publish_freq = finding_publishing_frequency
|
||||
if data_sources is not None:
|
||||
self.datasources = data_sources
|
||||
|
||||
def to_json(self):
|
||||
data_sources = {
|
||||
"cloudTrail": {"status": "DISABLED"},
|
||||
"dnsLogs": {"status": "DISABLED"},
|
||||
"flowLogs": {"status": "DISABLED"},
|
||||
"s3Logs": {
|
||||
"status": "ENABLED"
|
||||
if (self.datasources.get("s3Logs") or {}).get("enable")
|
||||
else "DISABLED"
|
||||
},
|
||||
"kubernetes": {
|
||||
"auditLogs": {
|
||||
"status": "ENABLED"
|
||||
if self.datasources.get("kubernetes", {})
|
||||
.get("auditLogs", {})
|
||||
.get("enable")
|
||||
else "DISABLED"
|
||||
}
|
||||
},
|
||||
}
|
||||
return {
|
||||
"createdAt": self.created_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
|
||||
"findingPublishingFrequency": self.finding_publish_freq,
|
||||
"serviceRole": self.service_role,
|
||||
"status": "ENABLED" if self.enabled else "DISABLED",
|
||||
"updatedAt": self.updated_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
|
||||
"dataSources": data_sources,
|
||||
"tags": self.tags,
|
||||
}
|
||||
|
||||
|
||||
guardduty_backends = BackendDict(GuardDutyBackend, "guardduty")
|
||||
|
@ -2,6 +2,7 @@ from __future__ import unicode_literals
|
||||
from moto.core.responses import BaseResponse
|
||||
from .models import guardduty_backends
|
||||
import json
|
||||
from urllib.parse import unquote
|
||||
|
||||
|
||||
class GuardDutyResponse(BaseResponse):
|
||||
@ -11,7 +12,21 @@ class GuardDutyResponse(BaseResponse):
|
||||
def guardduty_backend(self):
|
||||
return guardduty_backends[self.region]
|
||||
|
||||
def detector(self, request, full_url, headers):
|
||||
def filter(self, request, full_url, headers):
|
||||
self.setup_class(request, full_url, headers)
|
||||
if request.method == "GET":
|
||||
return self.get_filter()
|
||||
elif request.method == "DELETE":
|
||||
return self.delete_filter()
|
||||
elif request.method == "POST":
|
||||
return self.update_filter()
|
||||
|
||||
def filters(self, request, full_url, headers):
|
||||
self.setup_class(request, full_url, headers)
|
||||
if request.method == "POST":
|
||||
return self.create_filter()
|
||||
|
||||
def detectors(self, request, full_url, headers):
|
||||
self.setup_class(request, full_url, headers)
|
||||
if request.method == "POST":
|
||||
return self.create_detector()
|
||||
@ -20,6 +35,28 @@ class GuardDutyResponse(BaseResponse):
|
||||
else:
|
||||
return 404, {}, ""
|
||||
|
||||
def detector(self, request, full_url, headers):
|
||||
self.setup_class(request, full_url, headers)
|
||||
if request.method == "GET":
|
||||
return self.get_detector()
|
||||
elif request.method == "DELETE":
|
||||
return self.delete_detector()
|
||||
elif request.method == "POST":
|
||||
return self.update_detector()
|
||||
|
||||
def create_filter(self):
|
||||
detector_id = self.path.split("/")[-2]
|
||||
name = self._get_param("name")
|
||||
action = self._get_param("action")
|
||||
description = self._get_param("description")
|
||||
finding_criteria = self._get_param("findingCriteria")
|
||||
rank = self._get_param("rank")
|
||||
|
||||
self.guardduty_backend.create_filter(
|
||||
detector_id, name, action, description, finding_criteria, rank
|
||||
)
|
||||
return 200, {}, json.dumps({"name": name})
|
||||
|
||||
def create_detector(self):
|
||||
enable = self._get_param("enable")
|
||||
finding_publishing_frequency = self._get_param("findingPublishingFrequency")
|
||||
@ -32,7 +69,88 @@ class GuardDutyResponse(BaseResponse):
|
||||
|
||||
return 200, {}, json.dumps(dict(detectorId=detector_id))
|
||||
|
||||
def delete_detector(self):
|
||||
detector_id = self.path.split("/")[-1]
|
||||
|
||||
self.guardduty_backend.delete_detector(detector_id)
|
||||
return 200, {}, "{}"
|
||||
|
||||
def delete_filter(self):
|
||||
detector_id = self.path.split("/")[-3]
|
||||
filter_name = unquote(self.path.split("/")[-1])
|
||||
|
||||
self.guardduty_backend.delete_filter(detector_id, filter_name)
|
||||
return 200, {}, "{}"
|
||||
|
||||
def enable_organization_admin_account(self, request, full_url, headers):
|
||||
self.setup_class(request, full_url, headers)
|
||||
|
||||
admin_account = self._get_param("adminAccountId")
|
||||
self.guardduty_backend.enable_organization_admin_account(admin_account)
|
||||
|
||||
return 200, {}, "{}"
|
||||
|
||||
def list_organization_admin_accounts(self, request, full_url, headers):
|
||||
self.setup_class(request, full_url, headers)
|
||||
|
||||
account_ids = self.guardduty_backend.list_organization_admin_accounts()
|
||||
|
||||
return (
|
||||
200,
|
||||
{},
|
||||
json.dumps(
|
||||
{
|
||||
"adminAccounts": [
|
||||
{"adminAccountId": account_id, "adminStatus": "ENABLED"}
|
||||
for account_id in account_ids
|
||||
]
|
||||
}
|
||||
),
|
||||
)
|
||||
|
||||
def list_detectors(self):
|
||||
detector_ids = self.guardduty_backend.list_detectors()
|
||||
|
||||
return 200, {}, json.dumps({"detectorIds": detector_ids})
|
||||
|
||||
def get_detector(self):
|
||||
detector_id = self.path.split("/")[-1]
|
||||
|
||||
detector = self.guardduty_backend.get_detector(detector_id)
|
||||
return 200, {}, json.dumps(detector.to_json())
|
||||
|
||||
def get_filter(self):
|
||||
detector_id = self.path.split("/")[-3]
|
||||
filter_name = unquote(self.path.split("/")[-1])
|
||||
|
||||
_filter = self.guardduty_backend.get_filter(detector_id, filter_name)
|
||||
return 200, {}, json.dumps(_filter.to_json())
|
||||
|
||||
def update_detector(self):
|
||||
detector_id = self.path.split("/")[-1]
|
||||
enable = self._get_param("enable")
|
||||
finding_publishing_frequency = self._get_param("findingPublishingFrequency")
|
||||
data_sources = self._get_param("dataSources")
|
||||
|
||||
self.guardduty_backend.update_detector(
|
||||
detector_id, enable, finding_publishing_frequency, data_sources
|
||||
)
|
||||
return 200, {}, "{}"
|
||||
|
||||
def update_filter(self):
|
||||
detector_id = self.path.split("/")[-3]
|
||||
filter_name = unquote(self.path.split("/")[-1])
|
||||
action = self._get_param("action")
|
||||
description = self._get_param("description")
|
||||
finding_criteria = self._get_param("findingCriteria")
|
||||
rank = self._get_param("rank")
|
||||
|
||||
self.guardduty_backend.update_filter(
|
||||
detector_id,
|
||||
filter_name,
|
||||
action=action,
|
||||
description=description,
|
||||
finding_criteria=finding_criteria,
|
||||
rank=rank,
|
||||
)
|
||||
return 200, {}, json.dumps({"name": filter_name})
|
||||
|
@ -9,5 +9,10 @@ url_bases = [
|
||||
|
||||
|
||||
url_paths = {
|
||||
"{0}/detector$": response.detector,
|
||||
"{0}/detector$": response.detectors,
|
||||
"{0}/detector/(?P<detector_id>[^/]+)$": response.detector,
|
||||
"{0}/detector/(?P<detector_id>[^/]+)/filter$": response.filters,
|
||||
"{0}/detector/(?P<detector_id>[^/]+)/filter/(?P<filter_name>[^/]+)$": response.filter,
|
||||
"{0}/admin/enable$": response.enable_organization_admin_account,
|
||||
"{0}/admin$": response.list_organization_admin_accounts,
|
||||
}
|
||||
|
@ -84,6 +84,9 @@ events:
|
||||
- TestAccEventsConnection
|
||||
- TestAccEventsConnectionDataSource
|
||||
- TestAccEventsPermission
|
||||
guardduty:
|
||||
- TestAccGuardDuty_serial/Detector/basic
|
||||
- TestAccGuardDuty_serial/Filter/basic
|
||||
iam:
|
||||
- TestAccIAMAccessKey_
|
||||
- TestAccIAMAccountAlias_
|
||||
|
@ -1,6 +1,8 @@
|
||||
import boto3
|
||||
import pytest
|
||||
import sure # noqa # pylint: disable=unused-import
|
||||
|
||||
from botocore.exceptions import ClientError
|
||||
from moto import mock_guardduty
|
||||
|
||||
|
||||
@ -26,6 +28,79 @@ def test_create_detector_with_minimal_params():
|
||||
response["DetectorId"].shouldnt.equal(None)
|
||||
|
||||
|
||||
@mock_guardduty
|
||||
def test_get_detector_with_s3():
|
||||
client = boto3.client("guardduty", region_name="us-east-1")
|
||||
detector_id = client.create_detector(
|
||||
Enable=True,
|
||||
ClientToken="745645734574758463758",
|
||||
FindingPublishingFrequency="ONE_HOUR",
|
||||
DataSources={"S3Logs": {"Enable": True}},
|
||||
Tags={},
|
||||
)["DetectorId"]
|
||||
|
||||
resp = client.get_detector(DetectorId=detector_id)
|
||||
resp.should.have.key("FindingPublishingFrequency").equals("ONE_HOUR")
|
||||
resp.should.have.key("DataSources")
|
||||
resp["DataSources"].should.have.key("S3Logs").equals({"Status": "ENABLED"})
|
||||
resp.should.have.key("CreatedAt")
|
||||
|
||||
|
||||
@mock_guardduty
|
||||
def test_get_detector_with_all_data_sources():
|
||||
client = boto3.client("guardduty", region_name="us-east-1")
|
||||
detector_id = client.create_detector(
|
||||
Enable=True,
|
||||
ClientToken="745645734574758463758",
|
||||
FindingPublishingFrequency="ONE_HOUR",
|
||||
DataSources={
|
||||
"S3Logs": {"Enable": True},
|
||||
"Kubernetes": {"AuditLogs": {"Enable": True}},
|
||||
},
|
||||
Tags={},
|
||||
)["DetectorId"]
|
||||
|
||||
resp = client.get_detector(DetectorId=detector_id)
|
||||
resp.should.have.key("FindingPublishingFrequency").equals("ONE_HOUR")
|
||||
resp.should.have.key("DataSources")
|
||||
resp["DataSources"].should.have.key("S3Logs").equals({"Status": "ENABLED"})
|
||||
resp["DataSources"].should.have.key("Kubernetes")
|
||||
resp["DataSources"]["Kubernetes"].should.have.key("AuditLogs").equals(
|
||||
{"Status": "ENABLED"}
|
||||
)
|
||||
resp.should.have.key("CreatedAt")
|
||||
|
||||
|
||||
@mock_guardduty
|
||||
def test_update_detector():
|
||||
client = boto3.client("guardduty", region_name="us-east-1")
|
||||
detector_id = client.create_detector(
|
||||
Enable=True,
|
||||
ClientToken="745645734574758463758",
|
||||
FindingPublishingFrequency="ONE_HOUR",
|
||||
Tags={},
|
||||
)["DetectorId"]
|
||||
|
||||
client.update_detector(
|
||||
DetectorId=detector_id,
|
||||
Enable=False,
|
||||
FindingPublishingFrequency="SIX_HOURS",
|
||||
DataSources={
|
||||
"S3Logs": {"Enable": True},
|
||||
"Kubernetes": {"AuditLogs": {"Enable": False}},
|
||||
},
|
||||
)
|
||||
|
||||
resp = client.get_detector(DetectorId=detector_id)
|
||||
resp.should.have.key("FindingPublishingFrequency").equals("SIX_HOURS")
|
||||
resp.should.have.key("DataSources")
|
||||
resp["DataSources"].should.have.key("S3Logs").equals({"Status": "ENABLED"})
|
||||
resp["DataSources"].should.have.key("Kubernetes")
|
||||
resp["DataSources"]["Kubernetes"].should.have.key("AuditLogs").equals(
|
||||
{"Status": "DISABLED"}
|
||||
)
|
||||
|
||||
|
||||
@mock_guardduty
|
||||
def test_list_detectors_initial():
|
||||
client = boto3.client("guardduty", region_name="us-east-1")
|
||||
@ -49,3 +124,32 @@ def test_list_detectors():
|
||||
response = client.list_detectors()
|
||||
response.should.have.key("DetectorIds")
|
||||
set(response["DetectorIds"]).should.equal({d1, d2})
|
||||
|
||||
|
||||
@mock_guardduty
|
||||
def test_delete_detector():
|
||||
client = boto3.client("guardduty", region_name="us-east-1")
|
||||
detector_id = client.create_detector(
|
||||
Enable=True,
|
||||
ClientToken="745645734574758463758",
|
||||
FindingPublishingFrequency="ONE_HOUR",
|
||||
DataSources={
|
||||
"S3Logs": {"Enable": True},
|
||||
"Kubernetes": {"AuditLogs": {"Enable": True}},
|
||||
},
|
||||
Tags={},
|
||||
)["DetectorId"]
|
||||
|
||||
client.get_detector(DetectorId=detector_id)
|
||||
|
||||
client.delete_detector(DetectorId=detector_id)
|
||||
|
||||
with pytest.raises(ClientError) as exc:
|
||||
client.get_detector(DetectorId=detector_id)
|
||||
err = exc.value.response["Error"]
|
||||
err["Code"].should.equal("BadRequestException")
|
||||
err["Message"].should.equal(
|
||||
"The request is rejected because the input detectorId is not owned by the current account."
|
||||
)
|
||||
|
||||
client.list_detectors().should.have.key("DetectorIds").equals([])
|
||||
|
97
tests/test_guardduty/test_guardduty_filters.py
Normal file
97
tests/test_guardduty/test_guardduty_filters.py
Normal file
@ -0,0 +1,97 @@
|
||||
import boto3
|
||||
import pytest
|
||||
import sure # noqa # pylint: disable=unused-import
|
||||
|
||||
from botocore.exceptions import ClientError
|
||||
from moto import mock_guardduty
|
||||
|
||||
|
||||
@mock_guardduty
|
||||
def test_create_filter():
|
||||
client = boto3.client("guardduty", region_name="us-east-1")
|
||||
detector_id = client.create_detector(Enable=True)["DetectorId"]
|
||||
|
||||
resp = client.create_filter(
|
||||
DetectorId=detector_id,
|
||||
Name="my first filter",
|
||||
FindingCriteria={"Criterion": {"x": {"Eq": ["y"]}}},
|
||||
)
|
||||
resp.should.have.key("Name").equals("my first filter")
|
||||
|
||||
|
||||
@mock_guardduty
|
||||
def test_create_filter__defaults():
|
||||
client = boto3.client("guardduty", region_name="us-east-1")
|
||||
detector_id = client.create_detector(Enable=True)["DetectorId"]
|
||||
|
||||
client.create_filter(
|
||||
DetectorId=detector_id,
|
||||
Name="my first filter",
|
||||
FindingCriteria={"Criterion": {"x": {"Eq": ["y"]}}},
|
||||
)
|
||||
|
||||
resp = client.get_filter(DetectorId=detector_id, FilterName="my first filter")
|
||||
resp.should.have.key("Rank").equals(1)
|
||||
|
||||
|
||||
@mock_guardduty
|
||||
def test_get_filter():
|
||||
client = boto3.client("guardduty", region_name="us-east-1")
|
||||
detector_id = client.create_detector(Enable=True)["DetectorId"]
|
||||
|
||||
client.create_filter(
|
||||
DetectorId=detector_id,
|
||||
Name="my first filter",
|
||||
FindingCriteria={"Criterion": {"x": {"Eq": ["y"]}}},
|
||||
)
|
||||
|
||||
resp = client.get_filter(DetectorId=detector_id, FilterName="my first filter")
|
||||
resp.should.have.key("Name").equals("my first filter")
|
||||
resp.should.have.key("FindingCriteria").equals({"Criterion": {"x": {"Eq": ["y"]}}})
|
||||
|
||||
|
||||
@mock_guardduty
|
||||
def test_update_filter():
|
||||
client = boto3.client("guardduty", region_name="us-east-1")
|
||||
detector_id = client.create_detector(Enable=True)["DetectorId"]
|
||||
|
||||
client.create_filter(
|
||||
DetectorId=detector_id,
|
||||
Name="my first filter",
|
||||
FindingCriteria={"Criterion": {"x": {"Eq": ["y"]}}},
|
||||
)
|
||||
|
||||
resp = client.update_filter(
|
||||
DetectorId=detector_id,
|
||||
FilterName="my first filter",
|
||||
Description="with desc",
|
||||
Rank=21,
|
||||
Action="NOOP",
|
||||
)
|
||||
resp.should.have.key("Name").equals("my first filter")
|
||||
|
||||
resp = client.get_filter(DetectorId=detector_id, FilterName="my first filter")
|
||||
resp.should.have.key("Name").equals("my first filter")
|
||||
resp.should.have.key("Description").equals("with desc")
|
||||
resp.should.have.key("Rank").equals(21)
|
||||
resp.should.have.key("Action").equals("NOOP")
|
||||
resp.should.have.key("FindingCriteria").equals({"Criterion": {"x": {"Eq": ["y"]}}})
|
||||
|
||||
|
||||
@mock_guardduty
|
||||
def test_delete_filter():
|
||||
client = boto3.client("guardduty", region_name="us-east-1")
|
||||
detector_id = client.create_detector(Enable=True)["DetectorId"]
|
||||
|
||||
client.create_filter(
|
||||
DetectorId=detector_id,
|
||||
Name="my first filter",
|
||||
FindingCriteria={"Criterion": {"x": {"Eq": ["y"]}}},
|
||||
)
|
||||
|
||||
client.delete_filter(DetectorId=detector_id, FilterName="my first filter")
|
||||
|
||||
with pytest.raises(ClientError) as exc:
|
||||
client.get_filter(DetectorId=detector_id, FilterName="my first filter")
|
||||
err = exc.value.response["Error"]
|
||||
err["Code"].should.equal("BadRequestException")
|
24
tests/test_guardduty/test_guardduty_organization.py
Normal file
24
tests/test_guardduty/test_guardduty_organization.py
Normal file
@ -0,0 +1,24 @@
|
||||
import boto3
|
||||
import sure # noqa # pylint: disable=unused-import
|
||||
|
||||
from moto import mock_guardduty
|
||||
|
||||
|
||||
@mock_guardduty
|
||||
def test_enable_organization_admin_account():
|
||||
client = boto3.client("guardduty", region_name="us-east-1")
|
||||
resp = client.enable_organization_admin_account(AdminAccountId="")
|
||||
resp.should.have.key("ResponseMetadata")
|
||||
resp["ResponseMetadata"].should.have.key("HTTPStatusCode").equals(200)
|
||||
|
||||
|
||||
@mock_guardduty
|
||||
def test_list_organization_admin_accounts():
|
||||
client = boto3.client("guardduty", region_name="us-east-1")
|
||||
client.enable_organization_admin_account(AdminAccountId="someaccount")
|
||||
|
||||
resp = client.list_organization_admin_accounts()
|
||||
resp.should.have.key("AdminAccounts").length_of(1)
|
||||
resp["AdminAccounts"].should.contain(
|
||||
{"AdminAccountId": "someaccount", "AdminStatus": "ENABLED"}
|
||||
)
|
Loading…
x
Reference in New Issue
Block a user