156 lines
		
	
	
		
			5.0 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			156 lines
		
	
	
		
			5.0 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import boto3
 | |
| import pytest
 | |
| import sure  # noqa # pylint: disable=unused-import
 | |
| 
 | |
| from botocore.exceptions import ClientError
 | |
| from moto import mock_guardduty
 | |
| 
 | |
| 
 | |
| @mock_guardduty
 | |
| def test_create_detector():
 | |
|     client = boto3.client("guardduty", region_name="us-east-1")
 | |
|     response = client.create_detector(
 | |
|         Enable=True,
 | |
|         ClientToken="745645734574758463758",
 | |
|         FindingPublishingFrequency="ONE_HOUR",
 | |
|         DataSources={"S3Logs": {"Enable": True}},
 | |
|         Tags={},
 | |
|     )
 | |
|     response.should.have.key("DetectorId")
 | |
|     response["DetectorId"].shouldnt.equal(None)
 | |
| 
 | |
| 
 | |
| @mock_guardduty
 | |
| def test_create_detector_with_minimal_params():
 | |
|     client = boto3.client("guardduty", region_name="us-east-1")
 | |
|     response = client.create_detector(Enable=True)
 | |
|     response.should.have.key("DetectorId")
 | |
|     response["DetectorId"].shouldnt.equal(None)
 | |
| 
 | |
| 
 | |
| @mock_guardduty
 | |
| def test_get_detector_with_s3():
 | |
|     client = boto3.client("guardduty", region_name="us-east-1")
 | |
|     detector_id = client.create_detector(
 | |
|         Enable=True,
 | |
|         ClientToken="745645734574758463758",
 | |
|         FindingPublishingFrequency="ONE_HOUR",
 | |
|         DataSources={"S3Logs": {"Enable": True}},
 | |
|         Tags={},
 | |
|     )["DetectorId"]
 | |
| 
 | |
|     resp = client.get_detector(DetectorId=detector_id)
 | |
|     resp.should.have.key("FindingPublishingFrequency").equals("ONE_HOUR")
 | |
|     resp.should.have.key("DataSources")
 | |
|     resp["DataSources"].should.have.key("S3Logs").equals({"Status": "ENABLED"})
 | |
|     resp.should.have.key("CreatedAt")
 | |
| 
 | |
| 
 | |
| @mock_guardduty
 | |
| def test_get_detector_with_all_data_sources():
 | |
|     client = boto3.client("guardduty", region_name="us-east-1")
 | |
|     detector_id = client.create_detector(
 | |
|         Enable=True,
 | |
|         ClientToken="745645734574758463758",
 | |
|         FindingPublishingFrequency="ONE_HOUR",
 | |
|         DataSources={
 | |
|             "S3Logs": {"Enable": True},
 | |
|             "Kubernetes": {"AuditLogs": {"Enable": True}},
 | |
|         },
 | |
|         Tags={},
 | |
|     )["DetectorId"]
 | |
| 
 | |
|     resp = client.get_detector(DetectorId=detector_id)
 | |
|     resp.should.have.key("FindingPublishingFrequency").equals("ONE_HOUR")
 | |
|     resp.should.have.key("DataSources")
 | |
|     resp["DataSources"].should.have.key("S3Logs").equals({"Status": "ENABLED"})
 | |
|     resp["DataSources"].should.have.key("Kubernetes")
 | |
|     resp["DataSources"]["Kubernetes"].should.have.key("AuditLogs").equals(
 | |
|         {"Status": "ENABLED"}
 | |
|     )
 | |
|     resp.should.have.key("CreatedAt")
 | |
| 
 | |
| 
 | |
| @mock_guardduty
 | |
| def test_update_detector():
 | |
|     client = boto3.client("guardduty", region_name="us-east-1")
 | |
|     detector_id = client.create_detector(
 | |
|         Enable=True,
 | |
|         ClientToken="745645734574758463758",
 | |
|         FindingPublishingFrequency="ONE_HOUR",
 | |
|         Tags={},
 | |
|     )["DetectorId"]
 | |
| 
 | |
|     client.update_detector(
 | |
|         DetectorId=detector_id,
 | |
|         Enable=False,
 | |
|         FindingPublishingFrequency="SIX_HOURS",
 | |
|         DataSources={
 | |
|             "S3Logs": {"Enable": True},
 | |
|             "Kubernetes": {"AuditLogs": {"Enable": False}},
 | |
|         },
 | |
|     )
 | |
| 
 | |
|     resp = client.get_detector(DetectorId=detector_id)
 | |
|     resp.should.have.key("FindingPublishingFrequency").equals("SIX_HOURS")
 | |
|     resp.should.have.key("DataSources")
 | |
|     resp["DataSources"].should.have.key("S3Logs").equals({"Status": "ENABLED"})
 | |
|     resp["DataSources"].should.have.key("Kubernetes")
 | |
|     resp["DataSources"]["Kubernetes"].should.have.key("AuditLogs").equals(
 | |
|         {"Status": "DISABLED"}
 | |
|     )
 | |
| 
 | |
| 
 | |
| @mock_guardduty
 | |
| def test_list_detectors_initial():
 | |
|     client = boto3.client("guardduty", region_name="us-east-1")
 | |
| 
 | |
|     response = client.list_detectors()
 | |
|     response.should.have.key("DetectorIds").equals([])
 | |
| 
 | |
| 
 | |
| @mock_guardduty
 | |
| def test_list_detectors():
 | |
|     client = boto3.client("guardduty", region_name="us-east-1")
 | |
|     d1 = client.create_detector(
 | |
|         Enable=True,
 | |
|         ClientToken="745645734574758463758",
 | |
|         FindingPublishingFrequency="ONE_HOUR",
 | |
|         DataSources={"S3Logs": {"Enable": True}},
 | |
|         Tags={},
 | |
|     )["DetectorId"]
 | |
|     d2 = client.create_detector(Enable=False)["DetectorId"]
 | |
| 
 | |
|     response = client.list_detectors()
 | |
|     response.should.have.key("DetectorIds")
 | |
|     set(response["DetectorIds"]).should.equal({d1, d2})
 | |
| 
 | |
| 
 | |
| @mock_guardduty
 | |
| def test_delete_detector():
 | |
|     client = boto3.client("guardduty", region_name="us-east-1")
 | |
|     detector_id = client.create_detector(
 | |
|         Enable=True,
 | |
|         ClientToken="745645734574758463758",
 | |
|         FindingPublishingFrequency="ONE_HOUR",
 | |
|         DataSources={
 | |
|             "S3Logs": {"Enable": True},
 | |
|             "Kubernetes": {"AuditLogs": {"Enable": True}},
 | |
|         },
 | |
|         Tags={},
 | |
|     )["DetectorId"]
 | |
| 
 | |
|     client.get_detector(DetectorId=detector_id)
 | |
| 
 | |
|     client.delete_detector(DetectorId=detector_id)
 | |
| 
 | |
|     with pytest.raises(ClientError) as exc:
 | |
|         client.get_detector(DetectorId=detector_id)
 | |
|     err = exc.value.response["Error"]
 | |
|     err["Code"].should.equal("BadRequestException")
 | |
|     err["Message"].should.equal(
 | |
|         "The request is rejected because the input detectorId is not owned by the current account."
 | |
|     )
 | |
| 
 | |
|     client.list_detectors().should.have.key("DetectorIds").equals([])
 |