456 lines
18 KiB
Python
456 lines
18 KiB
Python
"""Unit tests for cloudtrail-supported APIs."""
|
|
import boto3
|
|
import pytest
|
|
import sure # noqa # pylint: disable=unused-import
|
|
|
|
from botocore.exceptions import ClientError
|
|
from datetime import datetime
|
|
from moto import mock_cloudtrail, mock_s3, mock_sns
|
|
from moto.core import ACCOUNT_ID
|
|
from uuid import uuid4
|
|
|
|
|
|
@mock_s3
|
|
@mock_cloudtrail
|
|
def test_create_trail_without_bucket():
|
|
client = boto3.client("cloudtrail", region_name="us-east-1")
|
|
with pytest.raises(ClientError) as exc:
|
|
client.create_trail(
|
|
Name="mytrailname", S3BucketName="specificweirdbucketthatdoesnotexist"
|
|
)
|
|
err = exc.value.response["Error"]
|
|
err["Code"].should.equal("S3BucketDoesNotExistException")
|
|
err["Message"].should.equal(
|
|
"S3 bucket specificweirdbucketthatdoesnotexist does not exist!"
|
|
)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"name,message",
|
|
[
|
|
(
|
|
"a",
|
|
"Trail name too short. Minimum allowed length: 3 characters. Specified name length: 1 characters.",
|
|
),
|
|
(
|
|
"aa",
|
|
"Trail name too short. Minimum allowed length: 3 characters. Specified name length: 2 characters.",
|
|
),
|
|
(
|
|
"a" * 129,
|
|
"Trail name too long. Maximum allowed length: 128 characters. Specified name length: 129 characters.",
|
|
),
|
|
("trail!", "Trail name must ends with a letter or number."),
|
|
(
|
|
"my#trail",
|
|
"Trail name or ARN can only contain uppercase letters, lowercase letters, numbers, periods (.), hyphens (-), and underscores (_).",
|
|
),
|
|
("-trail", "Trail name must starts with a letter or number."),
|
|
],
|
|
)
|
|
@mock_cloudtrail
|
|
def test_create_trail_invalid_name(name, message):
|
|
client = boto3.client("cloudtrail", region_name="us-east-1")
|
|
with pytest.raises(ClientError) as exc:
|
|
client.create_trail(
|
|
Name=name, S3BucketName="specificweirdbucketthatdoesnotexist"
|
|
)
|
|
err = exc.value.response["Error"]
|
|
err["Code"].should.equal("InvalidTrailNameException")
|
|
err["Message"].should.equal(message)
|
|
|
|
|
|
@mock_cloudtrail
|
|
@mock_s3
|
|
def test_create_trail_simple():
|
|
bucket_name, resp, trail_name = create_trail_simple()
|
|
resp.should.have.key("Name").equal(trail_name)
|
|
resp.should.have.key("S3BucketName").equal(bucket_name)
|
|
resp.shouldnt.have.key("S3KeyPrefix")
|
|
resp.shouldnt.have.key("SnsTopicName")
|
|
resp.shouldnt.have.key("SnsTopicARN")
|
|
resp.should.have.key("IncludeGlobalServiceEvents").equal(True)
|
|
resp.should.have.key("IsMultiRegionTrail").equal(False)
|
|
resp.should.have.key("TrailARN").equal(
|
|
f"arn:aws:cloudtrail:us-east-1:{ACCOUNT_ID}:trail/{trail_name}"
|
|
)
|
|
resp.should.have.key("LogFileValidationEnabled").equal(False)
|
|
resp.should.have.key("IsOrganizationTrail").equal(False)
|
|
return resp
|
|
|
|
|
|
def create_trail_simple(region_name="us-east-1"):
|
|
client = boto3.client("cloudtrail", region_name=region_name)
|
|
s3 = boto3.client("s3", region_name="us-east-1")
|
|
bucket_name = str(uuid4())
|
|
s3.create_bucket(Bucket=bucket_name)
|
|
trail_name = str(uuid4())
|
|
resp = client.create_trail(Name=trail_name, S3BucketName=bucket_name)
|
|
return bucket_name, resp, trail_name
|
|
|
|
|
|
@mock_cloudtrail
|
|
def test_create_trail_multi_but_not_global():
|
|
client = boto3.client("cloudtrail", region_name="us-east-1")
|
|
with pytest.raises(ClientError) as exc:
|
|
client.create_trail(
|
|
Name="mytrailname",
|
|
S3BucketName="non-existent",
|
|
IncludeGlobalServiceEvents=False,
|
|
IsMultiRegionTrail=True,
|
|
)
|
|
err = exc.value.response["Error"]
|
|
err["Code"].should.equal("InvalidParameterCombinationException")
|
|
# Note that this validation occurs before the S3 bucket is validated
|
|
err["Message"].should.equal(
|
|
"Multi-Region trail must include global service events."
|
|
)
|
|
|
|
|
|
@mock_cloudtrail
|
|
@mock_s3
|
|
@mock_sns
|
|
def test_create_trail_with_nonexisting_topic():
|
|
client = boto3.client("cloudtrail", region_name="us-east-1")
|
|
s3 = boto3.client("s3", region_name="us-east-1")
|
|
bucket_name = str(uuid4())
|
|
s3.create_bucket(Bucket=bucket_name)
|
|
with pytest.raises(ClientError) as exc:
|
|
client.create_trail(
|
|
Name="mytrailname",
|
|
S3BucketName=bucket_name,
|
|
SnsTopicName="nonexistingtopic",
|
|
)
|
|
err = exc.value.response["Error"]
|
|
err["Code"].should.equal("InsufficientSnsTopicPolicyException")
|
|
err["Message"].should.equal(
|
|
"SNS Topic does not exist or the topic policy is incorrect!"
|
|
)
|
|
|
|
|
|
@mock_cloudtrail
|
|
@mock_s3
|
|
@mock_sns
|
|
def test_create_trail_advanced():
|
|
bucket_name, resp, sns_topic_name, trail_name = create_trail_advanced()
|
|
resp.should.have.key("Name").equal(trail_name)
|
|
resp.should.have.key("S3BucketName").equal(bucket_name)
|
|
resp.should.have.key("S3KeyPrefix").equal("s3kp")
|
|
resp.should.have.key("SnsTopicName").equal(sns_topic_name)
|
|
resp.should.have.key("SnsTopicARN").equal(
|
|
f"arn:aws:sns:us-east-1:{ACCOUNT_ID}:{sns_topic_name}"
|
|
)
|
|
resp.should.have.key("IncludeGlobalServiceEvents").equal(True)
|
|
resp.should.have.key("IsMultiRegionTrail").equal(True)
|
|
resp.should.have.key("TrailARN").equal(
|
|
f"arn:aws:cloudtrail:us-east-1:{ACCOUNT_ID}:trail/{trail_name}"
|
|
)
|
|
resp.should.have.key("LogFileValidationEnabled").equal(True)
|
|
resp.should.have.key("IsOrganizationTrail").equal(True)
|
|
return resp
|
|
|
|
|
|
def create_trail_advanced(region_name="us-east-1"):
|
|
client = boto3.client("cloudtrail", region_name=region_name)
|
|
s3 = boto3.client("s3", region_name="us-east-1")
|
|
sns = boto3.client("sns", region_name=region_name)
|
|
bucket_name = str(uuid4())
|
|
s3.create_bucket(Bucket=bucket_name)
|
|
sns_topic_name = "cloudtrailtopic"
|
|
sns.create_topic(Name=sns_topic_name)
|
|
trail_name = str(uuid4())
|
|
resp = client.create_trail(
|
|
Name=trail_name,
|
|
S3BucketName=bucket_name,
|
|
S3KeyPrefix="s3kp",
|
|
SnsTopicName=sns_topic_name,
|
|
IncludeGlobalServiceEvents=True,
|
|
IsMultiRegionTrail=True,
|
|
EnableLogFileValidation=True,
|
|
IsOrganizationTrail=True,
|
|
TagsList=[{"Key": "tk", "Value": "tv"}, {"Key": "tk2", "Value": "tv2"}],
|
|
)
|
|
return bucket_name, resp, sns_topic_name, trail_name
|
|
|
|
|
|
@mock_cloudtrail
|
|
def test_get_trail_with_one_char():
|
|
client = boto3.client("cloudtrail", region_name="us-east-1")
|
|
with pytest.raises(ClientError) as exc:
|
|
client.get_trail(Name="?")
|
|
err = exc.value.response["Error"]
|
|
err["Code"].should.equal("InvalidTrailNameException")
|
|
err["Message"].should.equal(
|
|
"Trail name too short. Minimum allowed length: 3 characters. Specified name length: 1 characters."
|
|
)
|
|
|
|
|
|
@mock_cloudtrail
|
|
def test_get_trail_unknown():
|
|
client = boto3.client("cloudtrail", region_name="us-east-1")
|
|
with pytest.raises(ClientError) as exc:
|
|
client.get_trail(Name="unknowntrail")
|
|
err = exc.value.response["Error"]
|
|
err["Code"].should.equal("TrailNotFoundException")
|
|
err["Message"].should.equal(
|
|
f"Unknown trail: unknowntrail for the user: {ACCOUNT_ID}"
|
|
)
|
|
|
|
|
|
@mock_cloudtrail
|
|
def test_get_trail():
|
|
test_create_trail_simple()
|
|
client = boto3.client("cloudtrail", region_name="us-east-1")
|
|
_, _, name = create_trail_simple()
|
|
trail = client.get_trail(Name=name)["Trail"]
|
|
trail.should.have.key("Name").equal(name)
|
|
trail.should.have.key("IncludeGlobalServiceEvents").equal(True)
|
|
trail.should.have.key("IsMultiRegionTrail").equal(False)
|
|
trail.should.have.key("TrailARN").equal(
|
|
f"arn:aws:cloudtrail:us-east-1:{ACCOUNT_ID}:trail/{name}"
|
|
)
|
|
|
|
|
|
@mock_cloudtrail
|
|
def test_get_trail_status_with_one_char():
|
|
client = boto3.client("cloudtrail", region_name="us-east-1")
|
|
with pytest.raises(ClientError) as exc:
|
|
client.get_trail_status(Name="?")
|
|
err = exc.value.response["Error"]
|
|
err["Code"].should.equal("InvalidTrailNameException")
|
|
err["Message"].should.equal(
|
|
"Trail name too short. Minimum allowed length: 3 characters. Specified name length: 1 characters."
|
|
)
|
|
|
|
|
|
@mock_cloudtrail
|
|
def test_get_trail_status_unknown_trail():
|
|
client = boto3.client("cloudtrail", region_name="us-east-1")
|
|
with pytest.raises(ClientError) as exc:
|
|
client.get_trail_status(Name="unknowntrail")
|
|
err = exc.value.response["Error"]
|
|
err["Code"].should.equal("TrailNotFoundException")
|
|
err["Message"].should.equal(
|
|
f"Unknown trail: arn:aws:cloudtrail:us-east-1:{ACCOUNT_ID}:trail/unknowntrail for the user: {ACCOUNT_ID}"
|
|
)
|
|
|
|
|
|
@mock_cloudtrail
|
|
@mock_s3
|
|
def test_get_trail_status_inactive():
|
|
client = boto3.client("cloudtrail", region_name="us-east-1")
|
|
_, _, trail_name = create_trail_simple()
|
|
status = client.get_trail_status(Name=trail_name)
|
|
status.should.have.key("IsLogging").equal(False)
|
|
status.should.have.key("LatestDeliveryAttemptTime").equal("")
|
|
status.should.have.key("LatestNotificationAttemptTime").equal("")
|
|
status.should.have.key("LatestNotificationAttemptSucceeded").equal("")
|
|
status.should.have.key("LatestDeliveryAttemptSucceeded").equal("")
|
|
status.should.have.key("TimeLoggingStarted").equal("")
|
|
status.should.have.key("TimeLoggingStopped").equal("")
|
|
status.shouldnt.have.key("StartLoggingTime")
|
|
|
|
|
|
@mock_cloudtrail
|
|
@mock_s3
|
|
def test_get_trail_status_arn_inactive():
|
|
client = boto3.client("cloudtrail", region_name="us-east-1")
|
|
_, resp, _ = create_trail_simple()
|
|
status = client.get_trail_status(Name=resp["TrailARN"])
|
|
status.should.have.key("IsLogging").equal(False)
|
|
status.should.have.key("LatestDeliveryAttemptTime").equal("")
|
|
status.should.have.key("LatestNotificationAttemptTime").equal("")
|
|
status.should.have.key("LatestNotificationAttemptSucceeded").equal("")
|
|
status.should.have.key("LatestDeliveryAttemptSucceeded").equal("")
|
|
status.should.have.key("TimeLoggingStarted").equal("")
|
|
status.should.have.key("TimeLoggingStopped").equal("")
|
|
status.shouldnt.have.key("StartLoggingTime")
|
|
|
|
|
|
@mock_cloudtrail
|
|
@mock_s3
|
|
def test_get_trail_status_after_starting():
|
|
client = boto3.client("cloudtrail", region_name="eu-west-3")
|
|
_, _, trail_name = create_trail_simple(region_name="eu-west-3")
|
|
|
|
client.start_logging(Name=trail_name)
|
|
|
|
status = client.get_trail_status(Name=trail_name)
|
|
status.should.have.key("IsLogging").equal(True)
|
|
status.should.have.key("LatestDeliveryTime").be.a(datetime)
|
|
status.should.have.key("StartLoggingTime").be.a(datetime)
|
|
status.should.have.key(
|
|
"LatestDeliveryAttemptTime"
|
|
) # .equal("2021-10-13T15:36:53Z")
|
|
status.should.have.key("LatestNotificationAttemptTime").equal("")
|
|
status.should.have.key("LatestNotificationAttemptSucceeded").equal("")
|
|
status.should.have.key(
|
|
"LatestDeliveryAttemptSucceeded"
|
|
) # .equal("2021-10-13T15:36:53Z")
|
|
status.should.have.key("TimeLoggingStarted") # .equal("2021-10-13T15:02:21Z")
|
|
status.should.have.key("TimeLoggingStopped").equal("")
|
|
status.shouldnt.have.key("StopLoggingTime")
|
|
|
|
|
|
@mock_cloudtrail
|
|
@mock_s3
|
|
def test_get_trail_status_after_starting_and_stopping():
|
|
client = boto3.client("cloudtrail", region_name="eu-west-3")
|
|
_, _, trail_name = create_trail_simple(region_name="eu-west-3")
|
|
|
|
client.start_logging(Name=trail_name)
|
|
|
|
client.stop_logging(Name=trail_name)
|
|
|
|
status = client.get_trail_status(Name=trail_name)
|
|
status.should.have.key("IsLogging").equal(False)
|
|
status.should.have.key("LatestDeliveryTime").be.a(datetime)
|
|
status.should.have.key("StartLoggingTime").be.a(datetime)
|
|
status.should.have.key("StopLoggingTime").be.a(datetime)
|
|
status.should.have.key(
|
|
"LatestDeliveryAttemptTime"
|
|
) # .equal("2021-10-13T15:36:53Z")
|
|
status.should.have.key("LatestNotificationAttemptTime").equal("")
|
|
status.should.have.key("LatestNotificationAttemptSucceeded").equal("")
|
|
status.should.have.key(
|
|
"LatestDeliveryAttemptSucceeded"
|
|
) # .equal("2021-10-13T15:36:53Z")
|
|
status.should.have.key("TimeLoggingStarted") # .equal("2021-10-13T15:02:21Z")
|
|
status.should.have.key("TimeLoggingStopped") # .equal("2021-10-13T15:03:21Z")
|
|
|
|
|
|
@mock_cloudtrail
|
|
@mock_s3
|
|
@mock_sns
|
|
def test_list_trails():
|
|
client = boto3.client("cloudtrail", region_name="eu-west-3")
|
|
|
|
_, trail1, _ = create_trail_simple()
|
|
_, trail2, _, _ = create_trail_advanced(region_name="ap-southeast-2")
|
|
_, trail3, _ = create_trail_simple(region_name="eu-west-1")
|
|
|
|
all_trails = client.list_trails()["Trails"]
|
|
all_trails.should.have.length_of(3)
|
|
for trail in all_trails:
|
|
set(trail.keys()).should.equal({"TrailARN", "Name", "HomeRegion"})
|
|
|
|
all_trails[0]["TrailARN"].should.equal(trail2["TrailARN"])
|
|
all_trails[0]["Name"].should.equal(trail2["Name"])
|
|
all_trails[0]["HomeRegion"].should.equal("ap-southeast-2")
|
|
|
|
all_trails[1]["TrailARN"].should.equal(trail3["TrailARN"])
|
|
all_trails[1]["Name"].should.equal(trail3["Name"])
|
|
all_trails[1]["HomeRegion"].should.equal("eu-west-1")
|
|
|
|
all_trails[2]["TrailARN"].should.equal(trail1["TrailARN"])
|
|
all_trails[2]["Name"].should.equal(trail1["Name"])
|
|
all_trails[2]["HomeRegion"].should.equal("us-east-1")
|
|
|
|
|
|
@mock_cloudtrail
|
|
@mock_s3
|
|
@mock_sns
|
|
def test_describe_trails_without_shadowtrails():
|
|
client = boto3.client("cloudtrail", region_name="us-east-1")
|
|
_, trail1, _ = create_trail_simple()
|
|
_, trail2, _, _ = create_trail_advanced()
|
|
_, trail3, _ = create_trail_simple(region_name="eu-west-1")
|
|
|
|
trails = client.describe_trails()["trailList"]
|
|
trails.should.have.length_of(3)
|
|
|
|
first_trail = [t for t in trails if t["Name"] == trail1["Name"]][0]
|
|
first_trail.should.have.key("Name").equal(trail1["Name"])
|
|
first_trail.should.have.key("S3BucketName").equal(trail1["S3BucketName"])
|
|
first_trail.should.have.key("IncludeGlobalServiceEvents").equal(True)
|
|
first_trail.should.have.key("IsMultiRegionTrail").equal(False)
|
|
first_trail.should.have.key("HomeRegion").equal("us-east-1")
|
|
first_trail.should.have.key("LogFileValidationEnabled").equal(False)
|
|
first_trail.should.have.key("HasCustomEventSelectors").equal(False)
|
|
first_trail.should.have.key("HasInsightSelectors").equal(False)
|
|
first_trail.should.have.key("IsOrganizationTrail").equal(False)
|
|
first_trail.shouldnt.have.key("S3KeyPrefix")
|
|
first_trail.shouldnt.have.key("SnsTopicName")
|
|
first_trail.shouldnt.have.key("SnsTopicARN")
|
|
|
|
second_trail = [t for t in trails if t["Name"] == trail2["Name"]][0]
|
|
second_trail.should.have.key("Name").equal(trail2["Name"])
|
|
second_trail.should.have.key("S3BucketName").equal(trail2["S3BucketName"])
|
|
second_trail.should.have.key("S3KeyPrefix").equal(trail2["S3KeyPrefix"])
|
|
second_trail.should.have.key("SnsTopicName").equal(trail2["SnsTopicName"])
|
|
second_trail.should.have.key("SnsTopicARN").equal(trail2["SnsTopicARN"])
|
|
second_trail.should.have.key("IncludeGlobalServiceEvents").equal(True)
|
|
second_trail.should.have.key("IsMultiRegionTrail").equal(True)
|
|
second_trail.should.have.key("HomeRegion").equal("us-east-1")
|
|
second_trail.should.have.key("LogFileValidationEnabled").equal(True)
|
|
second_trail.should.have.key("HasCustomEventSelectors").equal(False)
|
|
second_trail.should.have.key("HasInsightSelectors").equal(False)
|
|
second_trail.should.have.key("IsOrganizationTrail").equal(True)
|
|
|
|
third_trail = [t for t in trails if t["Name"] == trail3["Name"]][0]
|
|
third_trail.should.have.key("Name").equal(trail3["Name"])
|
|
third_trail.should.have.key("S3BucketName").equal(trail3["S3BucketName"])
|
|
third_trail.should.have.key("IncludeGlobalServiceEvents").equal(True)
|
|
third_trail.should.have.key("IsMultiRegionTrail").equal(False)
|
|
third_trail.should.have.key("HomeRegion").equal("eu-west-1")
|
|
third_trail.should.have.key("LogFileValidationEnabled").equal(False)
|
|
third_trail.should.have.key("HasCustomEventSelectors").equal(False)
|
|
third_trail.should.have.key("HasInsightSelectors").equal(False)
|
|
third_trail.should.have.key("IsOrganizationTrail").equal(False)
|
|
third_trail.shouldnt.have.key("S3KeyPrefix")
|
|
third_trail.shouldnt.have.key("SnsTopicName")
|
|
third_trail.shouldnt.have.key("SnsTopicARN")
|
|
|
|
|
|
@mock_cloudtrail
|
|
@mock_s3
|
|
@mock_sns
|
|
def test_describe_trails_with_shadowtrails_true():
|
|
# Same behaviour as if shadowtrails-parameter was not supplied
|
|
client = boto3.client("cloudtrail", region_name="us-east-1")
|
|
create_trail_simple()
|
|
create_trail_advanced()
|
|
create_trail_simple(region_name="eu-west-1")
|
|
|
|
trails = client.describe_trails(includeShadowTrails=True)["trailList"]
|
|
trails.should.have.length_of(3)
|
|
|
|
eu_client = boto3.client("cloudtrail", region_name="eu-west-1")
|
|
trails = eu_client.describe_trails(includeShadowTrails=True)["trailList"]
|
|
trails.should.have.length_of(3)
|
|
|
|
|
|
@mock_cloudtrail
|
|
@mock_s3
|
|
@mock_sns
|
|
def test_describe_trails_with_shadowtrails_false():
|
|
# Only trails for the current region should now be returned
|
|
client = boto3.client("cloudtrail", region_name="us-east-1")
|
|
_, _, name1 = create_trail_simple()
|
|
_, _, _, name2 = create_trail_advanced()
|
|
_, _, name3 = create_trail_simple(region_name="eu-west-1")
|
|
|
|
trails = client.describe_trails(includeShadowTrails=False)["trailList"]
|
|
trails.should.have.length_of(2)
|
|
[t["Name"] for t in trails].should.equal([name1, name2])
|
|
|
|
eu_client = boto3.client("cloudtrail", region_name="eu-west-1")
|
|
trails = eu_client.describe_trails(includeShadowTrails=False)["trailList"]
|
|
trails.should.have.length_of(1)
|
|
[t["Name"] for t in trails].should.equal([name3])
|
|
|
|
|
|
@mock_cloudtrail
|
|
@mock_s3
|
|
def test_delete_trail():
|
|
client = boto3.client("cloudtrail", region_name="us-east-1")
|
|
_, _, name = create_trail_simple()
|
|
|
|
trails = client.describe_trails()["trailList"]
|
|
trails.should.have.length_of(1)
|
|
|
|
client.delete_trail(Name=name)
|
|
|
|
trails = client.describe_trails()["trailList"]
|
|
trails.should.have.length_of(0)
|