Service: CloudTrail (#4410)

This commit is contained in:
Bert Blommers 2021-10-13 22:22:31 +00:00 committed by GitHub
parent f451153c72
commit 6f13132a79
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 926 additions and 2 deletions

View File

@ -365,6 +365,30 @@
- [X] validate_template - [X] validate_template
</details> </details>
## cloudtrail
<details>
<summary>44% implemented</summary>
- [ ] add_tags
- [X] create_trail
- [X] delete_trail
- [X] describe_trails
- [ ] get_event_selectors
- [ ] get_insight_selectors
- [X] get_trail
- [X] get_trail_status
- [ ] list_public_keys
- [ ] list_tags
- [X] list_trails
- [ ] lookup_events
- [ ] put_event_selectors
- [ ] put_insight_selectors
- [ ] remove_tags
- [X] start_logging
- [X] stop_logging
- [ ] update_trail
</details>
## cloudwatch ## cloudwatch
<details> <details>
<summary>38% implemented</summary> <summary>38% implemented</summary>
@ -4444,7 +4468,6 @@
- cloudhsmv2 - cloudhsmv2
- cloudsearch - cloudsearch
- cloudsearchdomain - cloudsearchdomain
- cloudtrail
- codeartifact - codeartifact
- codebuild - codebuild
- codedeploy - codedeploy

View File

@ -34,6 +34,7 @@ mock_cloudformation = lazy_load(".cloudformation", "mock_cloudformation")
mock_cloudformation_deprecated = lazy_load( mock_cloudformation_deprecated = lazy_load(
".cloudformation", "mock_cloudformation_deprecated" ".cloudformation", "mock_cloudformation_deprecated"
) )
mock_cloudtrail = lazy_load(".cloudtrail", "mock_cloudtrail", boto3_name="cloudtrail")
mock_cloudwatch = lazy_load(".cloudwatch", "mock_cloudwatch") mock_cloudwatch = lazy_load(".cloudwatch", "mock_cloudwatch")
mock_cloudwatch_deprecated = lazy_load(".cloudwatch", "mock_cloudwatch_deprecated") mock_cloudwatch_deprecated = lazy_load(".cloudwatch", "mock_cloudwatch_deprecated")
mock_codecommit = lazy_load(".codecommit", "mock_codecommit") mock_codecommit = lazy_load(".codecommit", "mock_codecommit")

View File

@ -12,6 +12,7 @@ backend_url_patterns = [
("autoscaling", re.compile("https?://autoscaling\\.(.+)\\.amazonaws\\.com")), ("autoscaling", re.compile("https?://autoscaling\\.(.+)\\.amazonaws\\.com")),
("batch", re.compile("https?://batch\\.(.+)\\.amazonaws.com")), ("batch", re.compile("https?://batch\\.(.+)\\.amazonaws.com")),
("cloudformation", re.compile("https?://cloudformation\\.(.+)\\.amazonaws\\.com")), ("cloudformation", re.compile("https?://cloudformation\\.(.+)\\.amazonaws\\.com")),
("cloudtrail", re.compile("https?://cloudtrail\\.(.+)\\.amazonaws\\.com")),
("cloudwatch", re.compile("https?://monitoring\\.(.+)\\.amazonaws.com")), ("cloudwatch", re.compile("https?://monitoring\\.(.+)\\.amazonaws.com")),
("codecommit", re.compile("https?://codecommit\\.(.+)\\.amazonaws\\.com")), ("codecommit", re.compile("https?://codecommit\\.(.+)\\.amazonaws\\.com")),
("codepipeline", re.compile("https?://codepipeline\\.(.+)\\.amazonaws\\.com")), ("codepipeline", re.compile("https?://codepipeline\\.(.+)\\.amazonaws\\.com")),

View File

@ -0,0 +1,5 @@
"""cloudtrail module initialization; sets value for base decorator."""
from .models import cloudtrail_backends
from ..core.models import base_decorator
mock_cloudtrail = base_decorator(cloudtrail_backends)

View File

@ -0,0 +1,84 @@
"""Exceptions raised by the cloudtrail service."""
from moto.core import ACCOUNT_ID
from moto.core.exceptions import JsonRESTError
class InvalidParameterCombinationException(JsonRESTError):
code = 400
def __init__(self, message):
super(InvalidParameterCombinationException, self).__init__(
"InvalidParameterCombinationException", message
)
class S3BucketDoesNotExistException(JsonRESTError):
code = 400
def __init__(self, message):
super(S3BucketDoesNotExistException, self).__init__(
"S3BucketDoesNotExistException", message
)
class InsufficientSnsTopicPolicyException(JsonRESTError):
code = 400
def __init__(self, message):
super(InsufficientSnsTopicPolicyException, self).__init__(
"InsufficientSnsTopicPolicyException", message
)
class TrailNotFoundException(JsonRESTError):
code = 400
def __init__(self, name):
super(TrailNotFoundException, self).__init__(
"TrailNotFoundException",
f"Unknown trail: {name} for the user: {ACCOUNT_ID}",
)
class InvalidTrailNameException(JsonRESTError):
code = 400
def __init__(self, message):
super(InvalidTrailNameException, self).__init__(
"InvalidTrailNameException", message
)
class TrailNameTooShort(InvalidTrailNameException):
def __init__(self, actual_length):
super(TrailNameTooShort, self).__init__(
f"Trail name too short. Minimum allowed length: 3 characters. Specified name length: {actual_length} characters."
)
class TrailNameTooLong(InvalidTrailNameException):
def __init__(self, actual_length):
super(TrailNameTooLong, self).__init__(
f"Trail name too long. Maximum allowed length: 128 characters. Specified name length: {actual_length} characters."
)
class TrailNameNotStartingCorrectly(InvalidTrailNameException):
def __init__(self):
super(TrailNameNotStartingCorrectly, self).__init__(
"Trail name must starts with a letter or number."
)
class TrailNameNotEndingCorrectly(InvalidTrailNameException):
def __init__(self):
super(TrailNameNotEndingCorrectly, self).__init__(
"Trail name must ends with a letter or number."
)
class TrailNameInvalidChars(InvalidTrailNameException):
def __init__(self):
super(TrailNameInvalidChars, self).__init__(
"Trail name or ARN can only contain uppercase letters, lowercase letters, numbers, periods (.), hyphens (-), and underscores (_)."
)

266
moto/cloudtrail/models.py Normal file
View File

@ -0,0 +1,266 @@
import re
import time
from boto3 import Session
from datetime import datetime
from moto.core import ACCOUNT_ID, BaseBackend, BaseModel
from moto.core.utils import iso_8601_datetime_without_milliseconds
from .exceptions import (
S3BucketDoesNotExistException,
InsufficientSnsTopicPolicyException,
TrailNameTooLong,
TrailNameTooShort,
TrailNameNotStartingCorrectly,
TrailNameNotEndingCorrectly,
TrailNameInvalidChars,
TrailNotFoundException,
)
def datetime2int(date):
return int(time.mktime(date.timetuple()))
class TrailStatus(object):
def __init__(self):
self.is_logging = False
self.latest_delivery_time = ""
self.latest_delivery_attempt = ""
self.start_logging_time = None
self.started = None
self.stopped = None
def start_logging(self):
self.is_logging = True
self.started = datetime.utcnow()
self.latest_delivery_time = datetime2int(datetime.utcnow())
self.latest_delivery_attempt = iso_8601_datetime_without_milliseconds(
datetime.utcnow()
)
def stop_logging(self):
self.is_logging = False
self.stopped = datetime.utcnow()
def description(self):
if self.is_logging:
self.latest_delivery_time = datetime2int(datetime.utcnow())
self.latest_delivery_attempt = iso_8601_datetime_without_milliseconds(
datetime.utcnow()
)
desc = {
"IsLogging": self.is_logging,
"LatestDeliveryAttemptTime": self.latest_delivery_attempt,
"LatestNotificationAttemptTime": "",
"LatestNotificationAttemptSucceeded": "",
"LatestDeliveryAttemptSucceeded": "",
"TimeLoggingStarted": "",
"TimeLoggingStopped": "",
}
if self.started:
desc["StartLoggingTime"] = datetime2int(self.started)
desc["TimeLoggingStarted"] = iso_8601_datetime_without_milliseconds(
self.started
)
desc["LatestDeliveryTime"] = self.latest_delivery_time
if self.stopped:
desc["StopLoggingTime"] = datetime2int(self.stopped)
desc["TimeLoggingStopped"] = iso_8601_datetime_without_milliseconds(
self.stopped
)
return desc
class Trail(BaseModel):
def __init__(
self,
region_name,
trail_name,
bucket_name,
s3_key_prefix,
sns_topic_name,
is_multi_region,
log_validation,
is_org_trail,
):
self.region_name = region_name
self.trail_name = trail_name
self.bucket_name = bucket_name
self.s3_key_prefix = s3_key_prefix
self.sns_topic_name = sns_topic_name
self.is_multi_region = is_multi_region
self.log_validation = log_validation
self.is_org_trail = is_org_trail
self.check_name()
self.check_bucket_exists()
self.check_topic_exists()
self.status = TrailStatus()
@property
def arn(self):
return f"arn:aws:cloudtrail:{self.region_name}:{ACCOUNT_ID}:trail/{self.trail_name}"
@property
def topic_arn(self):
if self.sns_topic_name:
return f"arn:aws:sns:{self.region_name}:{ACCOUNT_ID}:{self.sns_topic_name}"
return None
def check_name(self):
if len(self.trail_name) < 3:
raise TrailNameTooShort(actual_length=len(self.trail_name))
if len(self.trail_name) > 128:
raise TrailNameTooLong(actual_length=len(self.trail_name))
if not re.match("^[0-9a-zA-Z]{1}.+$", self.trail_name):
raise TrailNameNotStartingCorrectly()
if not re.match(r".+[0-9a-zA-Z]{1}$", self.trail_name):
raise TrailNameNotEndingCorrectly()
if not re.match(r"^[.\-_0-9a-zA-Z]+$", self.trail_name):
raise TrailNameInvalidChars()
def check_bucket_exists(self):
from moto.s3 import s3_backend
try:
s3_backend.get_bucket(self.bucket_name)
except Exception:
raise S3BucketDoesNotExistException(
f"S3 bucket {self.bucket_name} does not exist!"
)
def check_topic_exists(self):
if self.sns_topic_name:
from moto.sns import sns_backends
sns_backend = sns_backends[self.region_name]
try:
sns_backend.get_topic(self.topic_arn)
except Exception:
raise InsufficientSnsTopicPolicyException(
"SNS Topic does not exist or the topic policy is incorrect!"
)
def start_logging(self):
self.status.start_logging()
def stop_logging(self):
self.status.stop_logging()
def short(self):
return {
"Name": self.trail_name,
"TrailARN": self.arn,
"HomeRegion": self.region_name,
}
def description(self, include_region=False):
desc = {
"Name": self.trail_name,
"S3BucketName": self.bucket_name,
"IncludeGlobalServiceEvents": True,
"IsMultiRegionTrail": self.is_multi_region,
"TrailARN": self.arn,
"LogFileValidationEnabled": self.log_validation,
"IsOrganizationTrail": self.is_org_trail,
"HasCustomEventSelectors": False,
"HasInsightSelectors": False,
}
if self.s3_key_prefix is not None:
desc["S3KeyPrefix"] = self.s3_key_prefix
if self.sns_topic_name is not None:
desc["SnsTopicName"] = self.sns_topic_name
desc["SnsTopicARN"] = self.topic_arn
if include_region:
desc["HomeRegion"] = self.region_name
return desc
class CloudTrailBackend(BaseBackend):
"""Implementation of CloudTrail APIs."""
def __init__(self, region_name):
self.region_name = region_name
self.trails = dict()
def create_trail(
self,
name,
bucket_name,
s3_key_prefix,
sns_topic_name,
is_multi_region,
log_validation,
is_org_trail,
):
trail = Trail(
self.region_name,
name,
bucket_name,
s3_key_prefix,
sns_topic_name,
is_multi_region,
log_validation,
is_org_trail,
)
self.trails[name] = trail
return trail
def get_trail(self, name):
if len(name) < 3:
raise TrailNameTooShort(actual_length=len(name))
if name not in self.trails:
raise TrailNotFoundException(name)
return self.trails[name]
def get_trail_status(self, name):
if len(name) < 3:
raise TrailNameTooShort(actual_length=len(name))
if name not in self.trails:
# This particular method returns the ARN as part of the error message
arn = f"arn:aws:cloudtrail:{self.region_name}:{ACCOUNT_ID}:trail/{name}"
raise TrailNotFoundException(name=arn)
trail = self.trails[name]
return trail.status
def describe_trails(self, include_shadow_trails):
all_trails = []
if include_shadow_trails:
for backend in cloudtrail_backends.values():
all_trails.extend(backend.trails.values())
else:
all_trails.extend(self.trails.values())
return all_trails
def list_trails(self):
return self.describe_trails(include_shadow_trails=True)
def start_logging(self, name):
trail = self.trails[name]
trail.start_logging()
def stop_logging(self, name):
trail = self.trails[name]
trail.stop_logging()
def delete_trail(self, name):
if name in self.trails:
del self.trails[name]
def reset(self):
"""Re-initialize all attributes for this instance."""
region_name = self.region_name
self.__dict__ = {}
self.__init__(region_name)
cloudtrail_backends = {}
for available_region in Session().get_available_regions("cloudtrail"):
cloudtrail_backends[available_region] = CloudTrailBackend(available_region)
for available_region in Session().get_available_regions(
"cloudtrail", partition_name="aws-us-gov"
):
cloudtrail_backends[available_region] = CloudTrailBackend(available_region)
for available_region in Session().get_available_regions(
"cloudtrail", partition_name="aws-cn"
):
cloudtrail_backends[available_region] = CloudTrailBackend(available_region)

View File

@ -0,0 +1,75 @@
"""Handles incoming cloudtrail requests, invokes methods, returns responses."""
import json
from moto.core.responses import BaseResponse
from .models import cloudtrail_backends
from .exceptions import InvalidParameterCombinationException
class CloudTrailResponse(BaseResponse):
"""Handler for CloudTrail requests and responses."""
@property
def cloudtrail_backend(self):
"""Return backend instance specific for this region."""
return cloudtrail_backends[self.region]
def create_trail(self):
name = self._get_param("Name")
bucket_name = self._get_param("S3BucketName")
is_global = self._get_bool_param("IncludeGlobalServiceEvents")
is_multi_region = self._get_bool_param("IsMultiRegionTrail", False)
if not is_global and is_multi_region:
raise InvalidParameterCombinationException(
"Multi-Region trail must include global service events."
)
s3_key_prefix = self._get_param("S3KeyPrefix")
sns_topic_name = self._get_param("SnsTopicName")
log_validation = self._get_bool_param("EnableLogFileValidation", False)
is_org_trail = self._get_bool_param("IsOrganizationTrail", False)
trail = self.cloudtrail_backend.create_trail(
name,
bucket_name,
s3_key_prefix,
sns_topic_name,
is_multi_region,
log_validation,
is_org_trail,
)
return json.dumps(trail.description())
def get_trail(self):
name = self._get_param("Name")
trail = self.cloudtrail_backend.get_trail(name)
return json.dumps({"Trail": trail.description()})
def get_trail_status(self):
name = self._get_param("Name")
status = self.cloudtrail_backend.get_trail_status(name)
return json.dumps(status.description())
def describe_trails(self):
include_shadow_trails = self._get_bool_param("includeShadowTrails", True)
trails = self.cloudtrail_backend.describe_trails(include_shadow_trails)
return json.dumps(
{"trailList": [t.description(include_region=True) for t in trails]}
)
def list_trails(self):
all_trails = self.cloudtrail_backend.list_trails()
return json.dumps({"Trails": [t.short() for t in all_trails]})
def start_logging(self):
name = self._get_param("Name")
self.cloudtrail_backend.start_logging(name)
return json.dumps({})
def stop_logging(self):
name = self._get_param("Name")
self.cloudtrail_backend.stop_logging(name)
return json.dumps({})
def delete_trail(self):
name = self._get_param("Name")
self.cloudtrail_backend.delete_trail(name)
return json.dumps({})

11
moto/cloudtrail/urls.py Normal file
View File

@ -0,0 +1,11 @@
"""cloudtrail base URL and path."""
from .responses import CloudTrailResponse
response = CloudTrailResponse()
url_bases = [
r"https?://cloudtrail\.(.+)\.amazonaws\.com",
]
url_paths = {"{0}/$": response.dispatch}

View File

@ -34,7 +34,7 @@ import boto3
from moto.core.responses import BaseResponse from moto.core.responses import BaseResponse
from moto.core import BaseBackend from moto.core import BaseBackend
from inflection import singularize from inflection import singularize
from implementation_coverage import get_moto_implementation from .implementation_coverage import get_moto_implementation
TEMPLATE_DIR = os.path.join(os.path.dirname(__file__), "./template") TEMPLATE_DIR = os.path.join(os.path.dirname(__file__), "./template")

View File

View File

@ -0,0 +1,439 @@
"""Unit tests for cloudtrail-supported APIs."""
import boto3
import pytest
import sure # noqa
from botocore.exceptions import ClientError
from datetime import datetime
from moto import mock_cloudtrail, mock_s3, mock_sns
from moto.core import ACCOUNT_ID
from uuid import uuid4
@mock_s3
@mock_cloudtrail
def test_create_trail_without_bucket():
client = boto3.client("cloudtrail", region_name="us-east-1")
with pytest.raises(ClientError) as exc:
client.create_trail(
Name="mytrailname", S3BucketName="specificweirdbucketthatdoesnotexist"
)
err = exc.value.response["Error"]
err["Code"].should.equal("S3BucketDoesNotExistException")
err["Message"].should.equal(
"S3 bucket specificweirdbucketthatdoesnotexist does not exist!"
)
@pytest.mark.parametrize(
"name,message",
[
(
"a",
"Trail name too short. Minimum allowed length: 3 characters. Specified name length: 1 characters.",
),
(
"aa",
"Trail name too short. Minimum allowed length: 3 characters. Specified name length: 2 characters.",
),
(
"a" * 129,
"Trail name too long. Maximum allowed length: 128 characters. Specified name length: 129 characters.",
),
("trail!", "Trail name must ends with a letter or number."),
(
"my#trail",
"Trail name or ARN can only contain uppercase letters, lowercase letters, numbers, periods (.), hyphens (-), and underscores (_).",
),
("-trail", "Trail name must starts with a letter or number."),
],
)
@mock_cloudtrail
def test_create_trail_invalid_name(name, message):
client = boto3.client("cloudtrail", region_name="us-east-1")
with pytest.raises(ClientError) as exc:
client.create_trail(
Name=name, S3BucketName="specificweirdbucketthatdoesnotexist"
)
err = exc.value.response["Error"]
err["Code"].should.equal("InvalidTrailNameException")
err["Message"].should.equal(message)
@mock_cloudtrail
@mock_s3
def test_create_trail_simple():
bucket_name, resp, trail_name = create_trail_simple()
resp.should.have.key("Name").equal(trail_name)
resp.should.have.key("S3BucketName").equal(bucket_name)
resp.shouldnt.have.key("S3KeyPrefix")
resp.shouldnt.have.key("SnsTopicName")
resp.shouldnt.have.key("SnsTopicARN")
resp.should.have.key("IncludeGlobalServiceEvents").equal(True)
resp.should.have.key("IsMultiRegionTrail").equal(False)
resp.should.have.key("TrailARN").equal(
f"arn:aws:cloudtrail:us-east-1:{ACCOUNT_ID}:trail/{trail_name}"
)
resp.should.have.key("LogFileValidationEnabled").equal(False)
resp.should.have.key("IsOrganizationTrail").equal(False)
return resp
def create_trail_simple(region_name="us-east-1"):
client = boto3.client("cloudtrail", region_name=region_name)
s3 = boto3.client("s3", region_name="us-east-1")
bucket_name = str(uuid4())
s3.create_bucket(Bucket=bucket_name)
trail_name = str(uuid4())
resp = client.create_trail(Name=trail_name, S3BucketName=bucket_name)
return bucket_name, resp, trail_name
@mock_cloudtrail
def test_create_trail_multi_but_not_global():
client = boto3.client("cloudtrail", region_name="us-east-1")
with pytest.raises(ClientError) as exc:
client.create_trail(
Name="mytrailname",
S3BucketName="non-existent",
IncludeGlobalServiceEvents=False,
IsMultiRegionTrail=True,
)
err = exc.value.response["Error"]
err["Code"].should.equal("InvalidParameterCombinationException")
# Note that this validation occurs before the S3 bucket is validated
err["Message"].should.equal(
"Multi-Region trail must include global service events."
)
@mock_cloudtrail
@mock_s3
@mock_sns
def test_create_trail_with_nonexisting_topic():
client = boto3.client("cloudtrail", region_name="us-east-1")
s3 = boto3.client("s3", region_name="us-east-1")
bucket_name = str(uuid4())
s3.create_bucket(Bucket=bucket_name)
with pytest.raises(ClientError) as exc:
client.create_trail(
Name="mytrailname",
S3BucketName=bucket_name,
SnsTopicName="nonexistingtopic",
)
err = exc.value.response["Error"]
err["Code"].should.equal("InsufficientSnsTopicPolicyException")
err["Message"].should.equal(
"SNS Topic does not exist or the topic policy is incorrect!"
)
@mock_cloudtrail
@mock_s3
@mock_sns
def test_create_trail_advanced():
bucket_name, resp, sns_topic_name, trail_name = create_trail_advanced()
resp.should.have.key("Name").equal(trail_name)
resp.should.have.key("S3BucketName").equal(bucket_name)
resp.should.have.key("S3KeyPrefix").equal("s3kp")
resp.should.have.key("SnsTopicName").equal(sns_topic_name)
resp.should.have.key("SnsTopicARN").equal(
f"arn:aws:sns:us-east-1:{ACCOUNT_ID}:{sns_topic_name}"
)
resp.should.have.key("IncludeGlobalServiceEvents").equal(True)
resp.should.have.key("IsMultiRegionTrail").equal(True)
resp.should.have.key("TrailARN").equal(
f"arn:aws:cloudtrail:us-east-1:{ACCOUNT_ID}:trail/{trail_name}"
)
resp.should.have.key("LogFileValidationEnabled").equal(True)
resp.should.have.key("IsOrganizationTrail").equal(True)
return resp
def create_trail_advanced(region_name="us-east-1"):
client = boto3.client("cloudtrail", region_name=region_name)
s3 = boto3.client("s3", region_name="us-east-1")
sns = boto3.client("sns", region_name=region_name)
bucket_name = str(uuid4())
s3.create_bucket(Bucket=bucket_name)
sns_topic_name = "cloudtrailtopic"
sns.create_topic(Name=sns_topic_name)
trail_name = str(uuid4())
resp = client.create_trail(
Name=trail_name,
S3BucketName=bucket_name,
S3KeyPrefix="s3kp",
SnsTopicName=sns_topic_name,
IncludeGlobalServiceEvents=True,
IsMultiRegionTrail=True,
EnableLogFileValidation=True,
IsOrganizationTrail=True,
TagsList=[{"Key": "tk", "Value": "tv"}, {"Key": "tk2", "Value": "tv2"}],
)
return bucket_name, resp, sns_topic_name, trail_name
@mock_cloudtrail
def test_get_trail_with_one_char():
client = boto3.client("cloudtrail", region_name="us-east-1")
with pytest.raises(ClientError) as exc:
client.get_trail(Name="?")
err = exc.value.response["Error"]
err["Code"].should.equal("InvalidTrailNameException")
err["Message"].should.equal(
"Trail name too short. Minimum allowed length: 3 characters. Specified name length: 1 characters."
)
@mock_cloudtrail
def test_get_trail_unknown():
client = boto3.client("cloudtrail", region_name="us-east-1")
with pytest.raises(ClientError) as exc:
client.get_trail(Name="unknowntrail")
err = exc.value.response["Error"]
err["Code"].should.equal("TrailNotFoundException")
err["Message"].should.equal(
f"Unknown trail: unknowntrail for the user: {ACCOUNT_ID}"
)
@mock_cloudtrail
def test_get_trail():
test_create_trail_simple()
client = boto3.client("cloudtrail", region_name="us-east-1")
_, trail1, name = create_trail_simple()
trail = client.get_trail(Name=name)["Trail"]
trail.should.have.key("Name").equal(name)
trail.should.have.key("IncludeGlobalServiceEvents").equal(True)
trail.should.have.key("IsMultiRegionTrail").equal(False)
trail.should.have.key("TrailARN").equal(
f"arn:aws:cloudtrail:us-east-1:{ACCOUNT_ID}:trail/{name}"
)
@mock_cloudtrail
def test_get_trail_status_with_one_char():
client = boto3.client("cloudtrail", region_name="us-east-1")
with pytest.raises(ClientError) as exc:
client.get_trail_status(Name="?")
err = exc.value.response["Error"]
err["Code"].should.equal("InvalidTrailNameException")
err["Message"].should.equal(
"Trail name too short. Minimum allowed length: 3 characters. Specified name length: 1 characters."
)
@mock_cloudtrail
def test_get_trail_status_unknown_trail():
client = boto3.client("cloudtrail", region_name="us-east-1")
with pytest.raises(ClientError) as exc:
client.get_trail_status(Name="unknowntrail")
err = exc.value.response["Error"]
err["Code"].should.equal("TrailNotFoundException")
err["Message"].should.equal(
f"Unknown trail: arn:aws:cloudtrail:us-east-1:{ACCOUNT_ID}:trail/unknowntrail for the user: {ACCOUNT_ID}"
)
@mock_cloudtrail
@mock_s3
def test_get_trail_status_inactive():
client = boto3.client("cloudtrail", region_name="us-east-1")
_, _, trail_name = create_trail_simple()
status = client.get_trail_status(Name=trail_name)
status.should.have.key("IsLogging").equal(False)
status.should.have.key("LatestDeliveryAttemptTime").equal("")
status.should.have.key("LatestNotificationAttemptTime").equal("")
status.should.have.key("LatestNotificationAttemptSucceeded").equal("")
status.should.have.key("LatestDeliveryAttemptSucceeded").equal("")
status.should.have.key("TimeLoggingStarted").equal("")
status.should.have.key("TimeLoggingStopped").equal("")
status.shouldnt.have.key("StartLoggingTime")
@mock_cloudtrail
@mock_s3
def test_get_trail_status_after_starting():
client = boto3.client("cloudtrail", region_name="eu-west-3")
_, _, trail_name = create_trail_simple(region_name="eu-west-3")
client.start_logging(Name=trail_name)
status = client.get_trail_status(Name=trail_name)
status.should.have.key("IsLogging").equal(True)
status.should.have.key("LatestDeliveryTime").be.a(datetime)
status.should.have.key("StartLoggingTime").be.a(datetime)
status.should.have.key(
"LatestDeliveryAttemptTime"
) # .equal("2021-10-13T15:36:53Z")
status.should.have.key("LatestNotificationAttemptTime").equal("")
status.should.have.key("LatestNotificationAttemptSucceeded").equal("")
status.should.have.key(
"LatestDeliveryAttemptSucceeded"
) # .equal("2021-10-13T15:36:53Z")
status.should.have.key("TimeLoggingStarted") # .equal("2021-10-13T15:02:21Z")
status.should.have.key("TimeLoggingStopped").equal("")
status.shouldnt.have.key("StopLoggingTime")
@mock_cloudtrail
@mock_s3
def test_get_trail_status_after_starting_and_stopping():
client = boto3.client("cloudtrail", region_name="eu-west-3")
_, _, trail_name = create_trail_simple(region_name="eu-west-3")
client.start_logging(Name=trail_name)
client.stop_logging(Name=trail_name)
status = client.get_trail_status(Name=trail_name)
status.should.have.key("IsLogging").equal(False)
status.should.have.key("LatestDeliveryTime").be.a(datetime)
status.should.have.key("StartLoggingTime").be.a(datetime)
status.should.have.key("StopLoggingTime").be.a(datetime)
status.should.have.key(
"LatestDeliveryAttemptTime"
) # .equal("2021-10-13T15:36:53Z")
status.should.have.key("LatestNotificationAttemptTime").equal("")
status.should.have.key("LatestNotificationAttemptSucceeded").equal("")
status.should.have.key(
"LatestDeliveryAttemptSucceeded"
) # .equal("2021-10-13T15:36:53Z")
status.should.have.key("TimeLoggingStarted") # .equal("2021-10-13T15:02:21Z")
status.should.have.key("TimeLoggingStopped") # .equal("2021-10-13T15:03:21Z")
@mock_cloudtrail
@mock_s3
@mock_sns
def test_list_trails():
client = boto3.client("cloudtrail", region_name="eu-west-3")
_, trail1, _ = create_trail_simple()
_, trail2, _, _ = create_trail_advanced(region_name="ap-southeast-2")
_, trail3, _ = create_trail_simple(region_name="eu-west-1")
all_trails = client.list_trails()["Trails"]
all_trails.should.have.length_of(3)
for trail in all_trails:
set(trail.keys()).should.equal({"TrailARN", "Name", "HomeRegion"})
all_trails[0]["TrailARN"].should.equal(trail2["TrailARN"])
all_trails[0]["Name"].should.equal(trail2["Name"])
all_trails[0]["HomeRegion"].should.equal("ap-southeast-2")
all_trails[1]["TrailARN"].should.equal(trail3["TrailARN"])
all_trails[1]["Name"].should.equal(trail3["Name"])
all_trails[1]["HomeRegion"].should.equal("eu-west-1")
all_trails[2]["TrailARN"].should.equal(trail1["TrailARN"])
all_trails[2]["Name"].should.equal(trail1["Name"])
all_trails[2]["HomeRegion"].should.equal("us-east-1")
@mock_cloudtrail
@mock_s3
@mock_sns
def test_describe_trails_without_shadowtrails():
client = boto3.client("cloudtrail", region_name="us-east-1")
_, trail1, _ = create_trail_simple()
_, trail2, _, _ = create_trail_advanced()
_, trail3, _ = create_trail_simple(region_name="eu-west-1")
trails = client.describe_trails()["trailList"]
trails.should.have.length_of(3)
first_trail = [t for t in trails if t["Name"] == trail1["Name"]][0]
first_trail.should.have.key("Name").equal(trail1["Name"])
first_trail.should.have.key("S3BucketName").equal(trail1["S3BucketName"])
first_trail.should.have.key("IncludeGlobalServiceEvents").equal(True)
first_trail.should.have.key("IsMultiRegionTrail").equal(False)
first_trail.should.have.key("HomeRegion").equal("us-east-1")
first_trail.should.have.key("LogFileValidationEnabled").equal(False)
first_trail.should.have.key("HasCustomEventSelectors").equal(False)
first_trail.should.have.key("HasInsightSelectors").equal(False)
first_trail.should.have.key("IsOrganizationTrail").equal(False)
first_trail.shouldnt.have.key("S3KeyPrefix")
first_trail.shouldnt.have.key("SnsTopicName")
first_trail.shouldnt.have.key("SnsTopicARN")
second_trail = [t for t in trails if t["Name"] == trail2["Name"]][0]
second_trail.should.have.key("Name").equal(trail2["Name"])
second_trail.should.have.key("S3BucketName").equal(trail2["S3BucketName"])
second_trail.should.have.key("S3KeyPrefix").equal(trail2["S3KeyPrefix"])
second_trail.should.have.key("SnsTopicName").equal(trail2["SnsTopicName"])
second_trail.should.have.key("SnsTopicARN").equal(trail2["SnsTopicARN"])
second_trail.should.have.key("IncludeGlobalServiceEvents").equal(True)
second_trail.should.have.key("IsMultiRegionTrail").equal(True)
second_trail.should.have.key("HomeRegion").equal("us-east-1")
second_trail.should.have.key("LogFileValidationEnabled").equal(True)
second_trail.should.have.key("HasCustomEventSelectors").equal(False)
second_trail.should.have.key("HasInsightSelectors").equal(False)
second_trail.should.have.key("IsOrganizationTrail").equal(True)
third_trail = [t for t in trails if t["Name"] == trail3["Name"]][0]
third_trail.should.have.key("Name").equal(trail3["Name"])
third_trail.should.have.key("S3BucketName").equal(trail3["S3BucketName"])
third_trail.should.have.key("IncludeGlobalServiceEvents").equal(True)
third_trail.should.have.key("IsMultiRegionTrail").equal(False)
third_trail.should.have.key("HomeRegion").equal("eu-west-1")
third_trail.should.have.key("LogFileValidationEnabled").equal(False)
third_trail.should.have.key("HasCustomEventSelectors").equal(False)
third_trail.should.have.key("HasInsightSelectors").equal(False)
third_trail.should.have.key("IsOrganizationTrail").equal(False)
third_trail.shouldnt.have.key("S3KeyPrefix")
third_trail.shouldnt.have.key("SnsTopicName")
third_trail.shouldnt.have.key("SnsTopicARN")
@mock_cloudtrail
@mock_s3
@mock_sns
def test_describe_trails_with_shadowtrails_true():
# Same behaviour as if shadowtrails-parameter was not supplied
client = boto3.client("cloudtrail", region_name="us-east-1")
_, trail1, _ = create_trail_simple()
_, trail2, _, _ = create_trail_advanced()
_, trail3, _ = create_trail_simple(region_name="eu-west-1")
trails = client.describe_trails(includeShadowTrails=True)["trailList"]
trails.should.have.length_of(3)
eu_client = boto3.client("cloudtrail", region_name="eu-west-1")
trails = eu_client.describe_trails(includeShadowTrails=True)["trailList"]
trails.should.have.length_of(3)
@mock_cloudtrail
@mock_s3
@mock_sns
def test_describe_trails_with_shadowtrails_false():
# Only trails for the current region should now be returned
client = boto3.client("cloudtrail", region_name="us-east-1")
_, _, name1 = create_trail_simple()
_, _, _, name2 = create_trail_advanced()
_, _, name3 = create_trail_simple(region_name="eu-west-1")
trails = client.describe_trails(includeShadowTrails=False)["trailList"]
trails.should.have.length_of(2)
[t["Name"] for t in trails].should.equal([name1, name2])
eu_client = boto3.client("cloudtrail", region_name="eu-west-1")
trails = eu_client.describe_trails(includeShadowTrails=False)["trailList"]
trails.should.have.length_of(1)
[t["Name"] for t in trails].should.equal([name3])
@mock_cloudtrail
@mock_s3
def test_delete_trail():
client = boto3.client("cloudtrail", region_name="us-east-1")
_, _, name = create_trail_simple()
trails = client.describe_trails()["trailList"]
trails.should.have.length_of(1)
client.delete_trail(Name=name)
trails = client.describe_trails()["trailList"]
trails.should.have.length_of(0)

View File

@ -0,0 +1,19 @@
"""Test different server responses."""
import json
import sure # noqa
import moto.server as server
from moto import mock_cloudtrail
@mock_cloudtrail
def test_cloudtrail_list():
backend = server.create_backend_app("cloudtrail")
test_client = backend.test_client()
headers = {
"X-Amz-Target": "com.amazonaws.cloudtrail.v20131101.CloudTrail_20131101.ListTrails"
}
res = test_client.post("/", headers=headers)
data = json.loads(res.data)
data.should.equal({"Trails": []})