Techdebt: Replace sure with regular asserts in CloudTrail (#6489)

This commit is contained in:
Bert Blommers 2023-07-06 14:48:33 +00:00 committed by GitHub
parent 13e4a08dc6
commit c2f000d496
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 240 additions and 259 deletions

View File

@ -1,7 +1,6 @@
"""Unit tests for cloudtrail-supported APIs."""
import boto3
import pytest
import sure # noqa # pylint: disable=unused-import
from botocore.exceptions import ClientError
from datetime import datetime
@ -19,9 +18,10 @@ def test_create_trail_without_bucket():
Name="mytrailname", S3BucketName="specificweirdbucketthatdoesnotexist"
)
err = exc.value.response["Error"]
err["Code"].should.equal("S3BucketDoesNotExistException")
err["Message"].should.equal(
"S3 bucket specificweirdbucketthatdoesnotexist does not exist!"
assert err["Code"] == "S3BucketDoesNotExistException"
assert (
err["Message"]
== "S3 bucket specificweirdbucketthatdoesnotexist does not exist!"
)
@ -56,26 +56,27 @@ def test_create_trail_invalid_name(name, message):
Name=name, S3BucketName="specificweirdbucketthatdoesnotexist"
)
err = exc.value.response["Error"]
err["Code"].should.equal("InvalidTrailNameException")
err["Message"].should.equal(message)
assert err["Code"] == "InvalidTrailNameException"
assert err["Message"] == message
@mock_cloudtrail
@mock_s3
def test_create_trail_simple():
bucket_name, resp, trail_name = create_trail_simple()
resp.should.have.key("Name").equal(trail_name)
resp.should.have.key("S3BucketName").equal(bucket_name)
resp.shouldnt.have.key("S3KeyPrefix")
resp.shouldnt.have.key("SnsTopicName")
resp.shouldnt.have.key("SnsTopicARN")
resp.should.have.key("IncludeGlobalServiceEvents").equal(True)
resp.should.have.key("IsMultiRegionTrail").equal(False)
resp.should.have.key("TrailARN").equal(
f"arn:aws:cloudtrail:us-east-1:{ACCOUNT_ID}:trail/{trail_name}"
assert resp["Name"] == trail_name
assert resp["S3BucketName"] == bucket_name
assert "S3KeyPrefix" not in resp
assert "SnsTopicName" not in resp
assert "SnsTopicARN" not in resp
assert resp["IncludeGlobalServiceEvents"] is True
assert resp["IsMultiRegionTrail"] is False
assert (
resp["TrailARN"]
== f"arn:aws:cloudtrail:us-east-1:{ACCOUNT_ID}:trail/{trail_name}"
)
resp.should.have.key("LogFileValidationEnabled").equal(False)
resp.should.have.key("IsOrganizationTrail").equal(False)
assert resp["LogFileValidationEnabled"] is False
assert resp["IsOrganizationTrail"] is False
def create_trail_simple(region_name="us-east-1"):
@ -99,11 +100,9 @@ def test_create_trail_multi_but_not_global():
IsMultiRegionTrail=True,
)
err = exc.value.response["Error"]
err["Code"].should.equal("InvalidParameterCombinationException")
assert err["Code"] == "InvalidParameterCombinationException"
# Note that this validation occurs before the S3 bucket is validated
err["Message"].should.equal(
"Multi-Region trail must include global service events."
)
assert err["Message"] == "Multi-Region trail must include global service events."
@mock_cloudtrail
@ -121,9 +120,9 @@ def test_create_trail_with_nonexisting_topic():
SnsTopicName="nonexistingtopic",
)
err = exc.value.response["Error"]
err["Code"].should.equal("InsufficientSnsTopicPolicyException")
err["Message"].should.equal(
"SNS Topic does not exist or the topic policy is incorrect!"
assert err["Code"] == "InsufficientSnsTopicPolicyException"
assert (
err["Message"] == "SNS Topic does not exist or the topic policy is incorrect!"
)
@ -132,23 +131,22 @@ def test_create_trail_with_nonexisting_topic():
@mock_sns
def test_create_trail_advanced():
bucket_name, resp, sns_topic_name, trail_name = create_trail_advanced()
resp.should.have.key("Name").equal(trail_name)
resp.should.have.key("S3BucketName").equal(bucket_name)
resp.should.have.key("S3KeyPrefix").equal("s3kp")
resp.should.have.key("SnsTopicName").equal(sns_topic_name)
resp.should.have.key("SnsTopicARN").equal(
f"arn:aws:sns:us-east-1:{ACCOUNT_ID}:{sns_topic_name}"
assert resp["Name"] == trail_name
assert resp["S3BucketName"] == bucket_name
assert resp["S3KeyPrefix"] == "s3kp"
assert resp["SnsTopicName"] == sns_topic_name
assert resp["SnsTopicARN"] == f"arn:aws:sns:us-east-1:{ACCOUNT_ID}:{sns_topic_name}"
assert resp["IncludeGlobalServiceEvents"] is True
assert resp["IsMultiRegionTrail"] is True
assert (
resp["TrailARN"]
== f"arn:aws:cloudtrail:us-east-1:{ACCOUNT_ID}:trail/{trail_name}"
)
resp.should.have.key("IncludeGlobalServiceEvents").equal(True)
resp.should.have.key("IsMultiRegionTrail").equal(True)
resp.should.have.key("TrailARN").equal(
f"arn:aws:cloudtrail:us-east-1:{ACCOUNT_ID}:trail/{trail_name}"
)
resp.should.have.key("LogFileValidationEnabled").equal(True)
resp.should.have.key("IsOrganizationTrail").equal(True)
resp.should.have.key("CloudWatchLogsLogGroupArn").equals("cwllga")
resp.should.have.key("CloudWatchLogsRoleArn").equals("cwlra")
resp.should.have.key("KmsKeyId").equals("kki")
assert resp["LogFileValidationEnabled"] is True
assert resp["IsOrganizationTrail"] is True
assert resp["CloudWatchLogsLogGroupArn"] == "cwllga"
assert resp["CloudWatchLogsRoleArn"] == "cwlra"
assert resp["KmsKeyId"] == "kki"
def create_trail_advanced(region_name="us-east-1"):
@ -183,9 +181,10 @@ def test_get_trail_with_one_char():
with pytest.raises(ClientError) as exc:
client.get_trail(Name="?")
err = exc.value.response["Error"]
err["Code"].should.equal("InvalidTrailNameException")
err["Message"].should.equal(
"Trail name too short. Minimum allowed length: 3 characters. Specified name length: 1 characters."
assert err["Code"] == "InvalidTrailNameException"
assert (
err["Message"]
== "Trail name too short. Minimum allowed length: 3 characters. Specified name length: 1 characters."
)
@ -195,10 +194,8 @@ def test_get_trail_unknown():
with pytest.raises(ClientError) as exc:
client.get_trail(Name="unknowntrail")
err = exc.value.response["Error"]
err["Code"].should.equal("TrailNotFoundException")
err["Message"].should.equal(
f"Unknown trail: unknowntrail for the user: {ACCOUNT_ID}"
)
assert err["Code"] == "TrailNotFoundException"
assert err["Message"] == f"Unknown trail: unknowntrail for the user: {ACCOUNT_ID}"
@mock_cloudtrail
@ -208,11 +205,11 @@ def test_get_trail():
client = boto3.client("cloudtrail", region_name="us-east-1")
_, _, name = create_trail_simple()
trail = client.get_trail(Name=name)["Trail"]
trail.should.have.key("Name").equal(name)
trail.should.have.key("IncludeGlobalServiceEvents").equal(True)
trail.should.have.key("IsMultiRegionTrail").equal(False)
trail.should.have.key("TrailARN").equal(
f"arn:aws:cloudtrail:us-east-1:{ACCOUNT_ID}:trail/{name}"
assert trail["Name"] == name
assert trail["IncludeGlobalServiceEvents"] is True
assert trail["IsMultiRegionTrail"] is False
assert (
trail["TrailARN"] == f"arn:aws:cloudtrail:us-east-1:{ACCOUNT_ID}:trail/{name}"
)
@ -222,9 +219,10 @@ def test_get_trail_status_with_one_char():
with pytest.raises(ClientError) as exc:
client.get_trail_status(Name="?")
err = exc.value.response["Error"]
err["Code"].should.equal("InvalidTrailNameException")
err["Message"].should.equal(
"Trail name too short. Minimum allowed length: 3 characters. Specified name length: 1 characters."
assert err["Code"] == "InvalidTrailNameException"
assert (
err["Message"]
== "Trail name too short. Minimum allowed length: 3 characters. Specified name length: 1 characters."
)
@ -234,9 +232,10 @@ def test_get_trail_status_unknown_trail():
with pytest.raises(ClientError) as exc:
client.get_trail_status(Name="unknowntrail")
err = exc.value.response["Error"]
err["Code"].should.equal("TrailNotFoundException")
err["Message"].should.equal(
f"Unknown trail: arn:aws:cloudtrail:us-east-1:{ACCOUNT_ID}:trail/unknowntrail for the user: {ACCOUNT_ID}"
assert err["Code"] == "TrailNotFoundException"
assert (
err["Message"]
== f"Unknown trail: arn:aws:cloudtrail:us-east-1:{ACCOUNT_ID}:trail/unknowntrail for the user: {ACCOUNT_ID}"
)
@ -246,14 +245,14 @@ def test_get_trail_status_inactive():
client = boto3.client("cloudtrail", region_name="us-east-1")
_, _, trail_name = create_trail_simple()
status = client.get_trail_status(Name=trail_name)
status.should.have.key("IsLogging").equal(False)
status.should.have.key("LatestDeliveryAttemptTime").equal("")
status.should.have.key("LatestNotificationAttemptTime").equal("")
status.should.have.key("LatestNotificationAttemptSucceeded").equal("")
status.should.have.key("LatestDeliveryAttemptSucceeded").equal("")
status.should.have.key("TimeLoggingStarted").equal("")
status.should.have.key("TimeLoggingStopped").equal("")
status.shouldnt.have.key("StartLoggingTime")
assert status["IsLogging"] is False
assert status["LatestDeliveryAttemptTime"] == ""
assert status["LatestNotificationAttemptTime"] == ""
assert status["LatestNotificationAttemptSucceeded"] == ""
assert status["LatestDeliveryAttemptSucceeded"] == ""
assert status["TimeLoggingStarted"] == ""
assert status["TimeLoggingStopped"] == ""
assert "StartLoggingTime" not in status
@mock_cloudtrail
@ -262,14 +261,14 @@ def test_get_trail_status_arn_inactive():
client = boto3.client("cloudtrail", region_name="us-east-1")
_, resp, _ = create_trail_simple()
status = client.get_trail_status(Name=resp["TrailARN"])
status.should.have.key("IsLogging").equal(False)
status.should.have.key("LatestDeliveryAttemptTime").equal("")
status.should.have.key("LatestNotificationAttemptTime").equal("")
status.should.have.key("LatestNotificationAttemptSucceeded").equal("")
status.should.have.key("LatestDeliveryAttemptSucceeded").equal("")
status.should.have.key("TimeLoggingStarted").equal("")
status.should.have.key("TimeLoggingStopped").equal("")
status.shouldnt.have.key("StartLoggingTime")
assert status["IsLogging"] is False
assert status["LatestDeliveryAttemptTime"] == ""
assert status["LatestNotificationAttemptTime"] == ""
assert status["LatestNotificationAttemptSucceeded"] == ""
assert status["LatestDeliveryAttemptSucceeded"] == ""
assert status["TimeLoggingStarted"] == ""
assert status["TimeLoggingStopped"] == ""
assert "StartLoggingTime" not in status
@mock_cloudtrail
@ -281,20 +280,18 @@ def test_get_trail_status_after_starting():
client.start_logging(Name=trail_name)
status = client.get_trail_status(Name=trail_name)
status.should.have.key("IsLogging").equal(True)
status.should.have.key("LatestDeliveryTime").be.a(datetime)
status.should.have.key("StartLoggingTime").be.a(datetime)
status.should.have.key(
"LatestDeliveryAttemptTime"
) # .equal("2021-10-13T15:36:53Z")
status.should.have.key("LatestNotificationAttemptTime").equal("")
status.should.have.key("LatestNotificationAttemptSucceeded").equal("")
status.should.have.key(
"LatestDeliveryAttemptSucceeded"
) # .equal("2021-10-13T15:36:53Z")
status.should.have.key("TimeLoggingStarted") # .equal("2021-10-13T15:02:21Z")
status.should.have.key("TimeLoggingStopped").equal("")
status.shouldnt.have.key("StopLoggingTime")
assert status["IsLogging"] is True
assert isinstance(status["LatestDeliveryTime"], datetime)
assert isinstance(status["StartLoggingTime"], datetime)
# .equal("2021-10-13T15:36:53Z")
assert "LatestDeliveryAttemptTime" in status
assert status["LatestNotificationAttemptTime"] == ""
assert status["LatestNotificationAttemptSucceeded"] == ""
# .equal("2021-10-13T15:36:53Z")
assert "LatestDeliveryAttemptSucceeded" in status
assert "TimeLoggingStarted" in status # .equal("2021-10-13T15:02:21Z")
assert status["TimeLoggingStopped"] == ""
assert "StopLoggingTime" not in status
@mock_cloudtrail
@ -308,20 +305,18 @@ def test_get_trail_status_after_starting_and_stopping():
client.stop_logging(Name=trail_name)
status = client.get_trail_status(Name=trail_name)
status.should.have.key("IsLogging").equal(False)
status.should.have.key("LatestDeliveryTime").be.a(datetime)
status.should.have.key("StartLoggingTime").be.a(datetime)
status.should.have.key("StopLoggingTime").be.a(datetime)
status.should.have.key(
"LatestDeliveryAttemptTime"
) # .equal("2021-10-13T15:36:53Z")
status.should.have.key("LatestNotificationAttemptTime").equal("")
status.should.have.key("LatestNotificationAttemptSucceeded").equal("")
status.should.have.key(
"LatestDeliveryAttemptSucceeded"
) # .equal("2021-10-13T15:36:53Z")
status.should.have.key("TimeLoggingStarted") # .equal("2021-10-13T15:02:21Z")
status.should.have.key("TimeLoggingStopped") # .equal("2021-10-13T15:03:21Z")
assert status["IsLogging"] is False
assert isinstance(status["LatestDeliveryTime"], datetime)
assert isinstance(status["StartLoggingTime"], datetime)
assert isinstance(status["StopLoggingTime"], datetime)
# .equal("2021-10-13T15:36:53Z")
assert "LatestDeliveryAttemptTime" in status
assert status["LatestNotificationAttemptTime"] == ""
assert status["LatestNotificationAttemptSucceeded"] == ""
# .equal("2021-10-13T15:36:53Z")
assert "LatestDeliveryAttemptSucceeded" in status
assert "TimeLoggingStarted" in status # .equal("2021-10-13T15:02:21Z")
assert "TimeLoggingStopped" in status # .equal("2021-10-13T15:03:21Z")
@mock_cloudtrail
@ -335,16 +330,15 @@ def test_list_trails_different_home_region_one_multiregion():
create_trail_simple(region_name="eu-west-1")
all_trails = client.list_trails()["Trails"]
all_trails.should.have.length_of(1)
# Only the Trail created in the ap-southeast-2 is MultiRegion
all_trails.should.contain(
assert all_trails == [
{
"TrailARN": trail2["TrailARN"],
"Name": trail2["Name"],
"HomeRegion": "ap-southeast-2",
}
)
]
@mock_cloudtrail
@ -360,7 +354,7 @@ def test_list_trails_different_home_region_no_multiregion():
all_trails = client.list_trails()["Trails"]
# Since there is no MultiRegion Trail created
# the eu-west-3 has no Trails
all_trails.should.have.length_of(0)
assert len(all_trails) == 0
@mock_cloudtrail
@ -375,35 +369,35 @@ def test_describe_trails_without_shadowtrails():
# There are two Trails created in the us-east-1
# one MultiRegion and the other is not MultiRegion
trails = client.describe_trails()["trailList"]
trails.should.have.length_of(2)
assert len(trails) == 2
first_trail = [t for t in trails if t["Name"] == trail1["Name"]][0]
first_trail.should.have.key("Name").equal(trail1["Name"])
first_trail.should.have.key("S3BucketName").equal(trail1["S3BucketName"])
first_trail.should.have.key("IncludeGlobalServiceEvents").equal(True)
first_trail.should.have.key("IsMultiRegionTrail").equal(False)
first_trail.should.have.key("HomeRegion").equal("us-east-1")
first_trail.should.have.key("LogFileValidationEnabled").equal(False)
first_trail.should.have.key("HasCustomEventSelectors").equal(False)
first_trail.should.have.key("HasInsightSelectors").equal(False)
first_trail.should.have.key("IsOrganizationTrail").equal(False)
first_trail.shouldnt.have.key("S3KeyPrefix")
first_trail.shouldnt.have.key("SnsTopicName")
first_trail.shouldnt.have.key("SnsTopicARN")
assert first_trail["Name"] == trail1["Name"]
assert first_trail["S3BucketName"] == trail1["S3BucketName"]
assert first_trail["IncludeGlobalServiceEvents"] is True
assert first_trail["IsMultiRegionTrail"] is False
assert first_trail["HomeRegion"] == "us-east-1"
assert first_trail["LogFileValidationEnabled"] is False
assert first_trail["HasCustomEventSelectors"] is False
assert first_trail["HasInsightSelectors"] is False
assert first_trail["IsOrganizationTrail"] is False
assert "S3KeyPrefix" not in first_trail
assert "SnsTopicName" not in first_trail
assert "SnsTopicARN" not in first_trail
second_trail = [t for t in trails if t["Name"] == trail2["Name"]][0]
second_trail.should.have.key("Name").equal(trail2["Name"])
second_trail.should.have.key("S3BucketName").equal(trail2["S3BucketName"])
second_trail.should.have.key("S3KeyPrefix").equal(trail2["S3KeyPrefix"])
second_trail.should.have.key("SnsTopicName").equal(trail2["SnsTopicName"])
second_trail.should.have.key("SnsTopicARN").equal(trail2["SnsTopicARN"])
second_trail.should.have.key("IncludeGlobalServiceEvents").equal(True)
second_trail.should.have.key("IsMultiRegionTrail").equal(True)
second_trail.should.have.key("HomeRegion").equal("us-east-1")
second_trail.should.have.key("LogFileValidationEnabled").equal(True)
second_trail.should.have.key("HasCustomEventSelectors").equal(False)
second_trail.should.have.key("HasInsightSelectors").equal(False)
second_trail.should.have.key("IsOrganizationTrail").equal(True)
assert second_trail["Name"] == trail2["Name"]
assert second_trail["S3BucketName"] == trail2["S3BucketName"]
assert second_trail["S3KeyPrefix"] == trail2["S3KeyPrefix"]
assert second_trail["SnsTopicName"] == trail2["SnsTopicName"]
assert second_trail["SnsTopicARN"] == trail2["SnsTopicARN"]
assert second_trail["IncludeGlobalServiceEvents"] is True
assert second_trail["IsMultiRegionTrail"] is True
assert second_trail["HomeRegion"] == "us-east-1"
assert second_trail["LogFileValidationEnabled"] is True
assert second_trail["HasCustomEventSelectors"] is False
assert second_trail["HasInsightSelectors"] is False
assert second_trail["IsOrganizationTrail"] is True
@mock_cloudtrail
@ -419,14 +413,14 @@ def test_describe_trails_with_shadowtrails_true():
# There are two Trails created in the us-east-1
# one MultiRegion and the other is not MultiRegion
trails = client.describe_trails(includeShadowTrails=True)["trailList"]
trails.should.have.length_of(2)
assert len(trails) == 2
# There are two Trails in the eu-west-1
# one MultiRegion (created in the us-east-1)
# and another not MultiRegion created in the us-east-1
eu_client = boto3.client("cloudtrail", region_name="eu-west-1")
trails = eu_client.describe_trails(includeShadowTrails=True)["trailList"]
trails.should.have.length_of(2)
assert len(trails) == 2
@mock_cloudtrail
@ -440,13 +434,13 @@ def test_describe_trails_with_shadowtrails_false():
_, _, name3 = create_trail_simple(region_name="eu-west-1")
trails = client.describe_trails(includeShadowTrails=False)["trailList"]
trails.should.have.length_of(2)
[t["Name"] for t in trails].should.equal([name1, name2])
assert len(trails) == 2
assert [t["Name"] for t in trails] == [name1, name2]
eu_client = boto3.client("cloudtrail", region_name="eu-west-1")
trails = eu_client.describe_trails(includeShadowTrails=False)["trailList"]
trails.should.have.length_of(1)
[t["Name"] for t in trails].should.equal([name3])
assert len(trails) == 1
assert [t["Name"] for t in trails] == [name3]
@mock_cloudtrail
@ -456,12 +450,12 @@ def test_delete_trail():
_, _, name = create_trail_simple()
trails = client.describe_trails()["trailList"]
trails.should.have.length_of(1)
assert len(trails) == 1
client.delete_trail(Name=name)
trails = client.describe_trails()["trailList"]
trails.should.have.length_of(0)
assert len(trails) == 0
@mock_cloudtrail
@ -470,26 +464,26 @@ def test_update_trail_simple():
client = boto3.client("cloudtrail", region_name="ap-southeast-2")
bucket_name, trail, name = create_trail_simple(region_name="ap-southeast-2")
resp = client.update_trail(Name=name)
resp.should.have.key("Name").equal(name)
resp.should.have.key("S3BucketName").equal(bucket_name)
resp.should.have.key("IncludeGlobalServiceEvents").equal(True)
resp.should.have.key("IsMultiRegionTrail").equal(False)
resp.should.have.key("LogFileValidationEnabled").equal(False)
resp.should.have.key("IsOrganizationTrail").equal(False)
resp.shouldnt.have.key("S3KeyPrefix")
resp.shouldnt.have.key("SnsTopicName")
resp.shouldnt.have.key("SnsTopicARN")
assert resp["Name"] == name
assert resp["S3BucketName"] == bucket_name
assert resp["IncludeGlobalServiceEvents"] is True
assert resp["IsMultiRegionTrail"] is False
assert resp["LogFileValidationEnabled"] is False
assert resp["IsOrganizationTrail"] is False
assert "S3KeyPrefix" not in resp
assert "SnsTopicName" not in resp
assert "SnsTopicARN" not in resp
trail = client.get_trail(Name=name)["Trail"]
trail.should.have.key("Name").equal(name)
trail.should.have.key("S3BucketName").equal(bucket_name)
trail.should.have.key("IncludeGlobalServiceEvents").equal(True)
trail.should.have.key("IsMultiRegionTrail").equal(False)
trail.should.have.key("LogFileValidationEnabled").equal(False)
trail.should.have.key("IsOrganizationTrail").equal(False)
trail.shouldnt.have.key("S3KeyPrefix")
trail.shouldnt.have.key("SnsTopicName")
trail.shouldnt.have.key("SnsTopicARN")
assert trail["Name"] == name
assert trail["S3BucketName"] == bucket_name
assert trail["IncludeGlobalServiceEvents"] is True
assert trail["IsMultiRegionTrail"] is False
assert trail["LogFileValidationEnabled"] is False
assert trail["IsOrganizationTrail"] is False
assert "S3KeyPrefix" not in trail
assert "SnsTopicName" not in trail
assert "SnsTopicARN" not in trail
@mock_cloudtrail
@ -510,24 +504,24 @@ def test_update_trail_full():
KmsKeyId="kki",
IsOrganizationTrail=True,
)
resp.should.have.key("Name").equal(name)
resp.should.have.key("S3BucketName").equal("updated_bucket")
resp.should.have.key("S3KeyPrefix").equals("s3kp")
resp.should.have.key("SnsTopicName").equals("stn")
resp.should.have.key("IncludeGlobalServiceEvents").equal(False)
resp.should.have.key("IsMultiRegionTrail").equal(True)
resp.should.have.key("LogFileValidationEnabled").equal(True)
resp.should.have.key("IsOrganizationTrail").equal(True)
assert resp["Name"] == name
assert resp["S3BucketName"] == "updated_bucket"
assert resp["S3KeyPrefix"] == "s3kp"
assert resp["SnsTopicName"] == "stn"
assert resp["IncludeGlobalServiceEvents"] is False
assert resp["IsMultiRegionTrail"] is True
assert resp["LogFileValidationEnabled"] is True
assert resp["IsOrganizationTrail"] is True
trail = client.get_trail(Name=name)["Trail"]
trail.should.have.key("Name").equal(name)
trail.should.have.key("S3BucketName").equal("updated_bucket")
trail.should.have.key("S3KeyPrefix").equals("s3kp")
trail.should.have.key("SnsTopicName").equals("stn")
trail.should.have.key("IncludeGlobalServiceEvents").equal(False)
trail.should.have.key("IsMultiRegionTrail").equal(True)
trail.should.have.key("LogFileValidationEnabled").equal(True)
trail.should.have.key("IsOrganizationTrail").equal(True)
trail.should.have.key("CloudWatchLogsLogGroupArn").equals("cwllga")
trail.should.have.key("CloudWatchLogsRoleArn").equals("cwlra")
trail.should.have.key("KmsKeyId").equals("kki")
assert trail["Name"] == name
assert trail["S3BucketName"] == "updated_bucket"
assert trail["S3KeyPrefix"] == "s3kp"
assert trail["SnsTopicName"] == "stn"
assert trail["IncludeGlobalServiceEvents"] is False
assert trail["IsMultiRegionTrail"] is True
assert trail["LogFileValidationEnabled"] is True
assert trail["IsOrganizationTrail"] is True
assert trail["CloudWatchLogsLogGroupArn"] == "cwllga"
assert trail["CloudWatchLogsRoleArn"] == "cwlra"
assert trail["KmsKeyId"] == "kki"

View File

@ -26,19 +26,17 @@ def test_put_event_selectors():
],
)
resp.should.have.key("TrailARN")
resp.should.have.key("EventSelectors").equals(
[
{
"ReadWriteType": "All",
"IncludeManagementEvents": True,
"DataResources": [
{"Type": "AWS::S3::Object", "Values": ["arn:aws:s3:::*/*"]}
],
}
]
)
resp.shouldnt.have.key("AdvancedEventSelectors")
assert "TrailARN" in resp
assert resp["EventSelectors"] == [
{
"ReadWriteType": "All",
"IncludeManagementEvents": True,
"DataResources": [
{"Type": "AWS::S3::Object", "Values": ["arn:aws:s3:::*/*"]}
],
}
]
assert "AdvancedEventSelectors" not in resp
@mock_cloudtrail
@ -63,21 +61,19 @@ def test_put_event_selectors_advanced():
],
)
resp.should.have.key("TrailARN")
resp.should.have.key("EventSelectors").equals(
[
{
"ReadWriteType": "All",
"IncludeManagementEvents": True,
"DataResources": [
{"Type": "AWS::S3::Object", "Values": ["arn:aws:s3:::*/*"]}
],
}
]
)
resp.should.have.key("AdvancedEventSelectors").equals(
[{"Name": "aes1", "FieldSelectors": [{"Field": "f", "Equals": ["fs1"]}]}]
)
assert "TrailARN" in resp
assert resp["EventSelectors"] == [
{
"ReadWriteType": "All",
"IncludeManagementEvents": True,
"DataResources": [
{"Type": "AWS::S3::Object", "Values": ["arn:aws:s3:::*/*"]}
],
}
]
assert resp["AdvancedEventSelectors"] == [
{"Name": "aes1", "FieldSelectors": [{"Field": "f", "Equals": ["fs1"]}]}
]
@mock_cloudtrail
@ -88,11 +84,12 @@ def test_get_event_selectors_empty():
resp = client.get_event_selectors(TrailName=trail_name)
resp.should.have.key("TrailARN").equals(
f"arn:aws:cloudtrail:ap-southeast-1:{ACCOUNT_ID}:trail/{trail_name}"
assert (
resp["TrailARN"]
== f"arn:aws:cloudtrail:ap-southeast-1:{ACCOUNT_ID}:trail/{trail_name}"
)
resp.should.have.key("EventSelectors").equals([])
resp.should.have.key("AdvancedEventSelectors").equals([])
assert resp["EventSelectors"] == []
assert resp["AdvancedEventSelectors"] == []
@mock_cloudtrail
@ -116,20 +113,19 @@ def test_get_event_selectors():
resp = client.get_event_selectors(TrailName=trail_name)
resp.should.have.key("TrailARN").equals(
f"arn:aws:cloudtrail:ap-southeast-2:{ACCOUNT_ID}:trail/{trail_name}"
)
resp.should.have.key("EventSelectors").equals(
[
{
"ReadWriteType": "All",
"IncludeManagementEvents": False,
"DataResources": [
{"Type": "AWS::S3::Object", "Values": ["arn:aws:s3:::*/*"]}
],
}
]
assert (
resp["TrailARN"]
== f"arn:aws:cloudtrail:ap-southeast-2:{ACCOUNT_ID}:trail/{trail_name}"
)
assert resp["EventSelectors"] == [
{
"ReadWriteType": "All",
"IncludeManagementEvents": False,
"DataResources": [
{"Type": "AWS::S3::Object", "Values": ["arn:aws:s3:::*/*"]}
],
}
]
@mock_cloudtrail
@ -160,13 +156,12 @@ def test_get_event_selectors_multiple():
resp = client.get_event_selectors(TrailName=trail_name)
resp.should.have.key("TrailARN")
assert "TrailARN" in resp
# Setting advanced selectors cancels any existing event selectors
resp.should.have.key("EventSelectors").equals([])
resp.should.have.key("AdvancedEventSelectors").length_of(1)
resp.should.have.key("AdvancedEventSelectors").equals(
[{"Name": "aes1", "FieldSelectors": [{"Field": "f", "Equals": ["fs1"]}]}]
)
assert resp["EventSelectors"] == []
assert resp["AdvancedEventSelectors"] == [
{"Name": "aes1", "FieldSelectors": [{"Field": "f", "Equals": ["fs1"]}]}
]
@mock_cloudtrail
@ -180,10 +175,8 @@ def test_put_insight_selectors(using_arn):
TrailName=trail_name, InsightSelectors=[{"InsightType": "ApiCallRateInsight"}]
)
resp.should.have.key("TrailARN")
resp.should.have.key("InsightSelectors").equals(
[{"InsightType": "ApiCallRateInsight"}]
)
assert "TrailARN" in resp
assert resp["InsightSelectors"] == [{"InsightType": "ApiCallRateInsight"}]
if using_arn:
trail_arn = resp["TrailARN"]
@ -191,10 +184,8 @@ def test_put_insight_selectors(using_arn):
else:
resp = client.get_insight_selectors(TrailName=trail_name)
resp.should.have.key("TrailARN")
resp.should.have.key("InsightSelectors").equals(
[{"InsightType": "ApiCallRateInsight"}]
)
assert "TrailARN" in resp
assert resp["InsightSelectors"] == [{"InsightType": "ApiCallRateInsight"}]
@mock_cloudtrail
@ -204,5 +195,5 @@ def test_get_insight_selectors():
_, resp, trail_name = create_trail_simple(region_name="eu-west-1")
resp = client.get_insight_selectors(TrailName=trail_name)
resp.should.have.key("TrailARN")
resp.shouldnt.have.key("InsightSelectors")
assert "TrailARN" in resp
assert "InsightSelectors" not in resp

View File

@ -15,10 +15,11 @@ def test_add_tags():
client.add_tags(ResourceId=trail_arn, TagsList=[{"Key": "k1", "Value": "v1"}])
resp = client.list_tags(ResourceIdList=[trail_arn])
resp.should.have.key("ResourceTagList").length_of(1)
resp["ResourceTagList"][0].should.equal(
{"ResourceId": trail_arn, "TagsList": [{"Key": "k1", "Value": "v1"}]}
)
assert len(resp["ResourceTagList"]) == 1
assert resp["ResourceTagList"][0] == {
"ResourceId": trail_arn,
"TagsList": [{"Key": "k1", "Value": "v1"}],
}
@mock_cloudtrail
@ -38,13 +39,11 @@ def test_remove_tags():
# Verify the first and third tag are still there
resp = client.list_tags(ResourceIdList=[trail_arn])
resp.should.have.key("ResourceTagList").length_of(1)
resp["ResourceTagList"][0].should.equal(
{
"ResourceId": trail_arn,
"TagsList": [{"Key": "tk", "Value": "tv"}, {"Key": "tk3", "Value": "tv3"}],
}
)
assert len(resp["ResourceTagList"]) == 1
assert resp["ResourceTagList"][0] == {
"ResourceId": trail_arn,
"TagsList": [{"Key": "tk", "Value": "tv"}, {"Key": "tk3", "Value": "tv3"}],
}
@mock_cloudtrail
@ -55,8 +54,8 @@ def test_create_trail_without_tags_and_list_tags():
trail_arn = resp["TrailARN"]
resp = client.list_tags(ResourceIdList=[trail_arn])
resp.should.have.key("ResourceTagList").length_of(1)
resp["ResourceTagList"][0].should.equal({"ResourceId": trail_arn, "TagsList": []})
assert len(resp["ResourceTagList"]) == 1
assert resp["ResourceTagList"][0] == {"ResourceId": trail_arn, "TagsList": []}
@mock_cloudtrail
@ -68,10 +67,8 @@ def test_create_trail_with_tags_and_list_tags():
trail_arn = resp["TrailARN"]
resp = client.list_tags(ResourceIdList=[trail_arn])
resp.should.have.key("ResourceTagList").length_of(1)
resp["ResourceTagList"][0].should.equal(
{
"ResourceId": trail_arn,
"TagsList": [{"Key": "tk", "Value": "tv"}, {"Key": "tk2", "Value": "tv2"}],
}
)
assert len(resp["ResourceTagList"]) == 1
assert resp["ResourceTagList"][0] == {
"ResourceId": trail_arn,
"TagsList": [{"Key": "tk", "Value": "tv"}, {"Key": "tk2", "Value": "tv2"}],
}

View File

@ -1,6 +1,5 @@
"""Test different server responses."""
import json
import sure # noqa # pylint: disable=unused-import
import moto.server as server
from moto import mock_cloudtrail
@ -16,4 +15,4 @@ def test_cloudtrail_list():
}
res = test_client.post("/", headers=headers)
data = json.loads(res.data)
data.should.equal({"Trails": []})
assert data == {"Trails": []}