moto/moto/cloudtrail/responses.py
2022-10-30 19:55:31 -01:00

194 lines
7.6 KiB
Python

"""Handles incoming cloudtrail requests, invokes methods, returns responses."""
import json
from typing import Any, Dict
from moto.core.responses import BaseResponse
from .models import cloudtrail_backends, CloudTrailBackend
from .exceptions import InvalidParameterCombinationException
class CloudTrailResponse(BaseResponse):
"""Handler for CloudTrail requests and responses."""
def __init__(self) -> None:
super().__init__(service_name="cloudtrail")
@property
def cloudtrail_backend(self) -> CloudTrailBackend:
"""Return backend instance specific for this region."""
return cloudtrail_backends[self.current_account][self.region]
def create_trail(self) -> str:
name = self._get_param("Name")
bucket_name = self._get_param("S3BucketName")
is_global = self._get_bool_param("IncludeGlobalServiceEvents", True)
is_multi_region = self._get_bool_param("IsMultiRegionTrail", False)
if not is_global and is_multi_region:
raise InvalidParameterCombinationException(
"Multi-Region trail must include global service events."
)
s3_key_prefix = self._get_param("S3KeyPrefix")
sns_topic_name = self._get_param("SnsTopicName")
log_validation = self._get_bool_param("EnableLogFileValidation", False)
is_org_trail = self._get_bool_param("IsOrganizationTrail", False)
cw_log_group_arn = self._get_param("CloudWatchLogsLogGroupArn")
cw_role_arn = self._get_param("CloudWatchLogsRoleArn")
kms_key_id = self._get_param("KmsKeyId")
tags_list = self._get_param("TagsList", [])
trail = self.cloudtrail_backend.create_trail(
name,
bucket_name,
s3_key_prefix,
sns_topic_name,
is_global,
is_multi_region,
log_validation,
is_org_trail,
cw_log_group_arn,
cw_role_arn,
kms_key_id,
tags_list,
)
return json.dumps(trail.description())
def get_trail(self) -> str:
name = self._get_param("Name")
trail = self.cloudtrail_backend.get_trail(name)
return json.dumps({"Trail": trail.description()})
def get_trail_status(self) -> str:
name = self._get_param("Name")
status = self.cloudtrail_backend.get_trail_status(name)
return json.dumps(status.description())
def describe_trails(self) -> str:
include_shadow_trails = self._get_bool_param("includeShadowTrails", True)
trails = self.cloudtrail_backend.describe_trails(include_shadow_trails)
return json.dumps(
{"trailList": [t.description(include_region=True) for t in trails]}
)
def list_trails(self) -> str:
all_trails = self.cloudtrail_backend.list_trails()
return json.dumps({"Trails": [t.short() for t in all_trails]})
def start_logging(self) -> str:
name = self._get_param("Name")
self.cloudtrail_backend.start_logging(name)
return json.dumps({})
def stop_logging(self) -> str:
name = self._get_param("Name")
self.cloudtrail_backend.stop_logging(name)
return json.dumps({})
def delete_trail(self) -> str:
name = self._get_param("Name")
self.cloudtrail_backend.delete_trail(name)
return json.dumps({})
def update_trail(self) -> str:
name = self._get_param("Name")
s3_bucket_name = self._get_param("S3BucketName")
s3_key_prefix = self._get_param("S3KeyPrefix")
sns_topic_name = self._get_param("SnsTopicName")
include_global_service_events = self._get_param("IncludeGlobalServiceEvents")
is_multi_region_trail = self._get_param("IsMultiRegionTrail")
enable_log_file_validation = self._get_param("EnableLogFileValidation")
is_organization_trail = self._get_param("IsOrganizationTrail")
cw_log_group_arn = self._get_param("CloudWatchLogsLogGroupArn")
cw_role_arn = self._get_param("CloudWatchLogsRoleArn")
kms_key_id = self._get_param("KmsKeyId")
trail = self.cloudtrail_backend.update_trail(
name=name,
s3_bucket_name=s3_bucket_name,
s3_key_prefix=s3_key_prefix,
sns_topic_name=sns_topic_name,
include_global_service_events=include_global_service_events,
is_multi_region_trail=is_multi_region_trail,
enable_log_file_validation=enable_log_file_validation,
is_organization_trail=is_organization_trail,
cw_log_group_arn=cw_log_group_arn,
cw_role_arn=cw_role_arn,
kms_key_id=kms_key_id,
)
return json.dumps(trail.description())
def put_event_selectors(self) -> str:
params = json.loads(self.body)
trail_name = params.get("TrailName")
event_selectors = params.get("EventSelectors")
advanced_event_selectors = params.get("AdvancedEventSelectors")
(
trail_arn,
event_selectors,
advanced_event_selectors,
) = self.cloudtrail_backend.put_event_selectors(
trail_name=trail_name,
event_selectors=event_selectors,
advanced_event_selectors=advanced_event_selectors,
)
return json.dumps(
dict(
TrailARN=trail_arn,
EventSelectors=event_selectors,
AdvancedEventSelectors=advanced_event_selectors,
)
)
def get_event_selectors(self) -> str:
params = json.loads(self.body)
trail_name = params.get("TrailName")
(
trail_arn,
event_selectors,
advanced_event_selectors,
) = self.cloudtrail_backend.get_event_selectors(trail_name=trail_name)
return json.dumps(
dict(
TrailARN=trail_arn,
EventSelectors=event_selectors,
AdvancedEventSelectors=advanced_event_selectors,
)
)
def add_tags(self) -> str:
params = json.loads(self.body)
resource_id = params.get("ResourceId")
tags_list = params.get("TagsList")
self.cloudtrail_backend.add_tags(resource_id=resource_id, tags_list=tags_list)
return json.dumps(dict())
def remove_tags(self) -> str:
resource_id = self._get_param("ResourceId")
tags_list = self._get_param("TagsList")
self.cloudtrail_backend.remove_tags(
resource_id=resource_id, tags_list=tags_list
)
return json.dumps(dict())
def list_tags(self) -> str:
params = json.loads(self.body)
resource_id_list = params.get("ResourceIdList")
resource_tag_list = self.cloudtrail_backend.list_tags(
resource_id_list=resource_id_list
)
return json.dumps(dict(ResourceTagList=resource_tag_list))
def put_insight_selectors(self) -> str:
trail_name = self._get_param("TrailName")
insight_selectors = self._get_param("InsightSelectors")
trail_arn, insight_selectors = self.cloudtrail_backend.put_insight_selectors(
trail_name=trail_name, insight_selectors=insight_selectors
)
return json.dumps(dict(TrailARN=trail_arn, InsightSelectors=insight_selectors))
def get_insight_selectors(self) -> str:
trail_name = self._get_param("TrailName")
trail_arn, insight_selectors = self.cloudtrail_backend.get_insight_selectors(
trail_name=trail_name
)
resp: Dict[str, Any] = {"TrailARN": trail_arn}
if insight_selectors:
resp["InsightSelectors"] = insight_selectors
return json.dumps(resp)