TechDebt: MyPy CloudTrail and CodeBuild (#5621)

This commit is contained in:
Bert Blommers 2022-10-30 19:55:31 -01:00 committed by GitHub
parent 80db33cb44
commit cf5e7b750f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 205 additions and 180 deletions

View File

@ -5,28 +5,28 @@ from moto.core.exceptions import JsonRESTError
class InvalidParameterCombinationException(JsonRESTError): class InvalidParameterCombinationException(JsonRESTError):
code = 400 code = 400
def __init__(self, message): def __init__(self, message: str):
super().__init__("InvalidParameterCombinationException", message) super().__init__("InvalidParameterCombinationException", message)
class S3BucketDoesNotExistException(JsonRESTError): class S3BucketDoesNotExistException(JsonRESTError):
code = 400 code = 400
def __init__(self, message): def __init__(self, message: str):
super().__init__("S3BucketDoesNotExistException", message) super().__init__("S3BucketDoesNotExistException", message)
class InsufficientSnsTopicPolicyException(JsonRESTError): class InsufficientSnsTopicPolicyException(JsonRESTError):
code = 400 code = 400
def __init__(self, message): def __init__(self, message: str):
super().__init__("InsufficientSnsTopicPolicyException", message) super().__init__("InsufficientSnsTopicPolicyException", message)
class TrailNotFoundException(JsonRESTError): class TrailNotFoundException(JsonRESTError):
code = 400 code = 400
def __init__(self, account_id, name): def __init__(self, account_id: str, name: str):
super().__init__( super().__init__(
"TrailNotFoundException", "TrailNotFoundException",
f"Unknown trail: {name} for the user: {account_id}", f"Unknown trail: {name} for the user: {account_id}",
@ -36,36 +36,36 @@ class TrailNotFoundException(JsonRESTError):
class InvalidTrailNameException(JsonRESTError): class InvalidTrailNameException(JsonRESTError):
code = 400 code = 400
def __init__(self, message): def __init__(self, message: str):
super().__init__("InvalidTrailNameException", message) super().__init__("InvalidTrailNameException", message)
class TrailNameTooShort(InvalidTrailNameException): class TrailNameTooShort(InvalidTrailNameException):
def __init__(self, actual_length): def __init__(self, actual_length: int):
super().__init__( super().__init__(
f"Trail name too short. Minimum allowed length: 3 characters. Specified name length: {actual_length} characters." f"Trail name too short. Minimum allowed length: 3 characters. Specified name length: {actual_length} characters."
) )
class TrailNameTooLong(InvalidTrailNameException): class TrailNameTooLong(InvalidTrailNameException):
def __init__(self, actual_length): def __init__(self, actual_length: int):
super().__init__( super().__init__(
f"Trail name too long. Maximum allowed length: 128 characters. Specified name length: {actual_length} characters." f"Trail name too long. Maximum allowed length: 128 characters. Specified name length: {actual_length} characters."
) )
class TrailNameNotStartingCorrectly(InvalidTrailNameException): class TrailNameNotStartingCorrectly(InvalidTrailNameException):
def __init__(self): def __init__(self) -> None:
super().__init__("Trail name must starts with a letter or number.") super().__init__("Trail name must starts with a letter or number.")
class TrailNameNotEndingCorrectly(InvalidTrailNameException): class TrailNameNotEndingCorrectly(InvalidTrailNameException):
def __init__(self): def __init__(self) -> None:
super().__init__("Trail name must ends with a letter or number.") super().__init__("Trail name must ends with a letter or number.")
class TrailNameInvalidChars(InvalidTrailNameException): class TrailNameInvalidChars(InvalidTrailNameException):
def __init__(self): def __init__(self) -> None:
super().__init__( super().__init__(
"Trail name or ARN can only contain uppercase letters, lowercase letters, numbers, periods (.), hyphens (-), and underscores (_)." "Trail name or ARN can only contain uppercase letters, lowercase letters, numbers, periods (.), hyphens (-), and underscores (_)."
) )

View File

@ -2,6 +2,7 @@ import re
import time import time
from datetime import datetime from datetime import datetime
from typing import Any, Dict, List, Optional, Iterable, Tuple
from moto.core import BaseBackend, BaseModel from moto.core import BaseBackend, BaseModel
from moto.core.utils import iso_8601_datetime_without_milliseconds, BackendDict from moto.core.utils import iso_8601_datetime_without_milliseconds, BackendDict
from moto.utilities.tagging_service import TaggingService from moto.utilities.tagging_service import TaggingService
@ -17,20 +18,19 @@ from .exceptions import (
) )
def datetime2int(date): def datetime2int(date: datetime) -> int:
return int(time.mktime(date.timetuple())) return int(time.mktime(date.timetuple()))
class TrailStatus(object): class TrailStatus:
def __init__(self): def __init__(self) -> None:
self.is_logging = False self.is_logging = False
self.latest_delivery_time = "" self.latest_delivery_time: Optional[int] = None
self.latest_delivery_attempt = "" self.latest_delivery_attempt: Optional[str] = ""
self.start_logging_time = None self.started: Optional[datetime] = None
self.started = None self.stopped: Optional[datetime] = None
self.stopped = None
def start_logging(self): def start_logging(self) -> None:
self.is_logging = True self.is_logging = True
self.started = datetime.utcnow() self.started = datetime.utcnow()
self.latest_delivery_time = datetime2int(datetime.utcnow()) self.latest_delivery_time = datetime2int(datetime.utcnow())
@ -38,17 +38,17 @@ class TrailStatus(object):
datetime.utcnow() datetime.utcnow()
) )
def stop_logging(self): def stop_logging(self) -> None:
self.is_logging = False self.is_logging = False
self.stopped = datetime.utcnow() self.stopped = datetime.utcnow()
def description(self): def description(self) -> Dict[str, Any]:
if self.is_logging: if self.is_logging:
self.latest_delivery_time = datetime2int(datetime.utcnow()) self.latest_delivery_time = datetime2int(datetime.utcnow())
self.latest_delivery_attempt = iso_8601_datetime_without_milliseconds( self.latest_delivery_attempt = iso_8601_datetime_without_milliseconds(
datetime.utcnow() datetime.utcnow()
) )
desc = { desc: Dict[str, Any] = {
"IsLogging": self.is_logging, "IsLogging": self.is_logging,
"LatestDeliveryAttemptTime": self.latest_delivery_attempt, "LatestDeliveryAttemptTime": self.latest_delivery_attempt,
"LatestNotificationAttemptTime": "", "LatestNotificationAttemptTime": "",
@ -74,19 +74,19 @@ class TrailStatus(object):
class Trail(BaseModel): class Trail(BaseModel):
def __init__( def __init__(
self, self,
account_id, account_id: str,
region_name, region_name: str,
trail_name, trail_name: str,
bucket_name, bucket_name: str,
s3_key_prefix, s3_key_prefix: str,
sns_topic_name, sns_topic_name: str,
is_global, is_global: bool,
is_multi_region, is_multi_region: bool,
log_validation, log_validation: bool,
is_org_trail, is_org_trail: bool,
cw_log_group_arn, cw_log_group_arn: str,
cw_role_arn, cw_role_arn: str,
kms_key_id, kms_key_id: str,
): ):
self.account_id = account_id self.account_id = account_id
self.region_name = region_name self.region_name = region_name
@ -105,21 +105,21 @@ class Trail(BaseModel):
self.check_bucket_exists() self.check_bucket_exists()
self.check_topic_exists() self.check_topic_exists()
self.status = TrailStatus() self.status = TrailStatus()
self.event_selectors = list() self.event_selectors: List[Dict[str, Any]] = list()
self.advanced_event_selectors = list() self.advanced_event_selectors: List[Dict[str, Any]] = list()
self.insight_selectors = list() self.insight_selectors: List[Dict[str, str]] = list()
@property @property
def arn(self): def arn(self) -> str:
return f"arn:aws:cloudtrail:{self.region_name}:{self.account_id}:trail/{self.trail_name}" return f"arn:aws:cloudtrail:{self.region_name}:{self.account_id}:trail/{self.trail_name}"
@property @property
def topic_arn(self): def topic_arn(self) -> Optional[str]:
if self.sns_topic_name: if self.sns_topic_name:
return f"arn:aws:sns:{self.region_name}:{self.account_id}:{self.sns_topic_name}" return f"arn:aws:sns:{self.region_name}:{self.account_id}:{self.sns_topic_name}"
return None return None
def check_name(self): def check_name(self) -> None:
if len(self.trail_name) < 3: if len(self.trail_name) < 3:
raise TrailNameTooShort(actual_length=len(self.trail_name)) raise TrailNameTooShort(actual_length=len(self.trail_name))
if len(self.trail_name) > 128: if len(self.trail_name) > 128:
@ -131,7 +131,7 @@ class Trail(BaseModel):
if not re.match(r"^[.\-_0-9a-zA-Z]+$", self.trail_name): if not re.match(r"^[.\-_0-9a-zA-Z]+$", self.trail_name):
raise TrailNameInvalidChars() raise TrailNameInvalidChars()
def check_bucket_exists(self): def check_bucket_exists(self) -> None:
from moto.s3.models import s3_backends from moto.s3.models import s3_backends
try: try:
@ -141,7 +141,7 @@ class Trail(BaseModel):
f"S3 bucket {self.bucket_name} does not exist!" f"S3 bucket {self.bucket_name} does not exist!"
) )
def check_topic_exists(self): def check_topic_exists(self) -> None:
if self.sns_topic_name: if self.sns_topic_name:
from moto.sns import sns_backends from moto.sns import sns_backends
@ -153,41 +153,45 @@ class Trail(BaseModel):
"SNS Topic does not exist or the topic policy is incorrect!" "SNS Topic does not exist or the topic policy is incorrect!"
) )
def start_logging(self): def start_logging(self) -> None:
self.status.start_logging() self.status.start_logging()
def stop_logging(self): def stop_logging(self) -> None:
self.status.stop_logging() self.status.stop_logging()
def put_event_selectors(self, event_selectors, advanced_event_selectors): def put_event_selectors(
self,
event_selectors: List[Dict[str, Any]],
advanced_event_selectors: List[Dict[str, Any]],
) -> None:
if event_selectors: if event_selectors:
self.event_selectors = event_selectors self.event_selectors = event_selectors
elif advanced_event_selectors: elif advanced_event_selectors:
self.event_selectors = [] self.event_selectors = []
self.advanced_event_selectors = advanced_event_selectors self.advanced_event_selectors = advanced_event_selectors
def get_event_selectors(self): def get_event_selectors(self) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
return self.event_selectors, self.advanced_event_selectors return self.event_selectors, self.advanced_event_selectors
def put_insight_selectors(self, insight_selectors): def put_insight_selectors(self, insight_selectors: List[Dict[str, str]]) -> None:
self.insight_selectors.extend(insight_selectors) self.insight_selectors.extend(insight_selectors)
def get_insight_selectors(self): def get_insight_selectors(self) -> List[Dict[str, str]]:
return self.insight_selectors return self.insight_selectors
def update( def update(
self, self,
s3_bucket_name, s3_bucket_name: Optional[str],
s3_key_prefix, s3_key_prefix: Optional[str],
sns_topic_name, sns_topic_name: Optional[str],
include_global_service_events, include_global_service_events: Optional[bool],
is_multi_region_trail, is_multi_region_trail: Optional[bool],
enable_log_file_validation, enable_log_file_validation: Optional[bool],
is_organization_trail, is_organization_trail: Optional[bool],
cw_log_group_arn, cw_log_group_arn: Optional[str],
cw_role_arn, cw_role_arn: Optional[str],
kms_key_id, kms_key_id: Optional[str],
): ) -> None:
if s3_bucket_name is not None: if s3_bucket_name is not None:
self.bucket_name = s3_bucket_name self.bucket_name = s3_bucket_name
if s3_key_prefix is not None: if s3_key_prefix is not None:
@ -209,14 +213,14 @@ class Trail(BaseModel):
if kms_key_id is not None: if kms_key_id is not None:
self.kms_key_id = kms_key_id self.kms_key_id = kms_key_id
def short(self): def short(self) -> Dict[str, str]:
return { return {
"Name": self.trail_name, "Name": self.trail_name,
"TrailARN": self.arn, "TrailARN": self.arn,
"HomeRegion": self.region_name, "HomeRegion": self.region_name,
} }
def description(self, include_region=False): def description(self, include_region: bool = False) -> Dict[str, Any]:
desc = { desc = {
"Name": self.trail_name, "Name": self.trail_name,
"S3BucketName": self.bucket_name, "S3BucketName": self.bucket_name,
@ -244,26 +248,26 @@ class Trail(BaseModel):
class CloudTrailBackend(BaseBackend): class CloudTrailBackend(BaseBackend):
"""Implementation of CloudTrail APIs.""" """Implementation of CloudTrail APIs."""
def __init__(self, region_name, account_id): def __init__(self, region_name: str, account_id: str):
super().__init__(region_name, account_id) super().__init__(region_name, account_id)
self.trails = dict() self.trails: Dict[str, Trail] = dict()
self.tagging_service = TaggingService(tag_name="TagsList") self.tagging_service = TaggingService(tag_name="TagsList")
def create_trail( def create_trail(
self, self,
name, name: str,
bucket_name, bucket_name: str,
s3_key_prefix, s3_key_prefix: str,
sns_topic_name, sns_topic_name: str,
is_global, is_global: bool,
is_multi_region, is_multi_region: bool,
log_validation, log_validation: bool,
is_org_trail, is_org_trail: bool,
cw_log_group_arn, cw_log_group_arn: str,
cw_role_arn, cw_role_arn: str,
kms_key_id, kms_key_id: str,
tags_list, tags_list: List[Dict[str, str]],
): ) -> Trail:
trail = Trail( trail = Trail(
self.account_id, self.account_id,
self.region_name, self.region_name,
@ -283,7 +287,7 @@ class CloudTrailBackend(BaseBackend):
self.tagging_service.tag_resource(trail.arn, tags_list) self.tagging_service.tag_resource(trail.arn, tags_list)
return trail return trail
def get_trail(self, name_or_arn): def get_trail(self, name_or_arn: str) -> Trail:
if len(name_or_arn) < 3: if len(name_or_arn) < 3:
raise TrailNameTooShort(actual_length=len(name_or_arn)) raise TrailNameTooShort(actual_length=len(name_or_arn))
if name_or_arn in self.trails: if name_or_arn in self.trails:
@ -293,7 +297,7 @@ class CloudTrailBackend(BaseBackend):
return trail return trail
raise TrailNotFoundException(account_id=self.account_id, name=name_or_arn) raise TrailNotFoundException(account_id=self.account_id, name=name_or_arn)
def get_trail_status(self, name): def get_trail_status(self, name: str) -> TrailStatus:
if len(name) < 3: if len(name) < 3:
raise TrailNameTooShort(actual_length=len(name)) raise TrailNameTooShort(actual_length=len(name))
trail_name = next( trail_name = next(
@ -313,7 +317,7 @@ class CloudTrailBackend(BaseBackend):
trail = self.trails[trail_name] trail = self.trails[trail_name]
return trail.status return trail.status
def describe_trails(self, include_shadow_trails): def describe_trails(self, include_shadow_trails: bool) -> Iterable[Trail]:
all_trails = [] all_trails = []
if include_shadow_trails: if include_shadow_trails:
current_account = cloudtrail_backends[self.account_id] current_account = cloudtrail_backends[self.account_id]
@ -325,35 +329,35 @@ class CloudTrailBackend(BaseBackend):
all_trails.extend(self.trails.values()) all_trails.extend(self.trails.values())
return all_trails return all_trails
def list_trails(self): def list_trails(self) -> Iterable[Trail]:
return self.describe_trails(include_shadow_trails=True) return self.describe_trails(include_shadow_trails=True)
def start_logging(self, name): def start_logging(self, name: str) -> None:
trail = self.trails[name] trail = self.trails[name]
trail.start_logging() trail.start_logging()
def stop_logging(self, name): def stop_logging(self, name: str) -> None:
trail = self.trails[name] trail = self.trails[name]
trail.stop_logging() trail.stop_logging()
def delete_trail(self, name): def delete_trail(self, name: str) -> None:
if name in self.trails: if name in self.trails:
del self.trails[name] del self.trails[name]
def update_trail( def update_trail(
self, self,
name, name: str,
s3_bucket_name, s3_bucket_name: str,
s3_key_prefix, s3_key_prefix: str,
sns_topic_name, sns_topic_name: str,
include_global_service_events, include_global_service_events: bool,
is_multi_region_trail, is_multi_region_trail: bool,
enable_log_file_validation, enable_log_file_validation: bool,
is_organization_trail, is_organization_trail: bool,
cw_log_group_arn, cw_log_group_arn: str,
cw_role_arn, cw_role_arn: str,
kms_key_id, kms_key_id: str,
): ) -> Trail:
trail = self.get_trail(name_or_arn=name) trail = self.get_trail(name_or_arn=name)
trail.update( trail.update(
s3_bucket_name=s3_bucket_name, s3_bucket_name=s3_bucket_name,
@ -370,41 +374,50 @@ class CloudTrailBackend(BaseBackend):
return trail return trail
def put_event_selectors( def put_event_selectors(
self, trail_name, event_selectors, advanced_event_selectors self,
): trail_name: str,
event_selectors: List[Dict[str, Any]],
advanced_event_selectors: List[Dict[str, Any]],
) -> Tuple[str, List[Dict[str, Any]], List[Dict[str, Any]]]:
trail = self.get_trail(trail_name) trail = self.get_trail(trail_name)
trail.put_event_selectors(event_selectors, advanced_event_selectors) trail.put_event_selectors(event_selectors, advanced_event_selectors)
trail_arn = trail.arn trail_arn = trail.arn
return trail_arn, event_selectors, advanced_event_selectors return trail_arn, event_selectors, advanced_event_selectors
def get_event_selectors(self, trail_name): def get_event_selectors(
self, trail_name: str
) -> Tuple[str, List[Dict[str, Any]], List[Dict[str, Any]]]:
trail = self.get_trail(trail_name) trail = self.get_trail(trail_name)
event_selectors, advanced_event_selectors = trail.get_event_selectors() event_selectors, advanced_event_selectors = trail.get_event_selectors()
return trail.arn, event_selectors, advanced_event_selectors return trail.arn, event_selectors, advanced_event_selectors
def add_tags(self, resource_id, tags_list): def add_tags(self, resource_id: str, tags_list: List[Dict[str, str]]) -> None:
self.tagging_service.tag_resource(resource_id, tags_list) self.tagging_service.tag_resource(resource_id, tags_list)
def remove_tags(self, resource_id, tags_list): def remove_tags(self, resource_id: str, tags_list: List[Dict[str, str]]) -> None:
self.tagging_service.untag_resource_using_tags(resource_id, tags_list) self.tagging_service.untag_resource_using_tags(resource_id, tags_list)
def list_tags(self, resource_id_list): def list_tags(self, resource_id_list: List[str]) -> List[Dict[str, Any]]:
""" """
Pagination is not yet implemented Pagination is not yet implemented
""" """
resp = [{"ResourceId": r_id} for r_id in resource_id_list] resp: List[Dict[str, Any]] = [{"ResourceId": r_id} for r_id in resource_id_list]
for item in resp: for item in resp:
item["TagsList"] = self.tagging_service.list_tags_for_resource( item["TagsList"] = self.tagging_service.list_tags_for_resource(
item["ResourceId"] item["ResourceId"]
)["TagsList"] )["TagsList"]
return resp return resp
def put_insight_selectors(self, trail_name, insight_selectors): def put_insight_selectors(
self, trail_name: str, insight_selectors: List[Dict[str, str]]
) -> Tuple[str, List[Dict[str, str]]]:
trail = self.get_trail(trail_name) trail = self.get_trail(trail_name)
trail.put_insight_selectors(insight_selectors) trail.put_insight_selectors(insight_selectors)
return trail.arn, insight_selectors return trail.arn, insight_selectors
def get_insight_selectors(self, trail_name): def get_insight_selectors(
self, trail_name: str
) -> Tuple[str, List[Dict[str, str]]]:
trail = self.get_trail(trail_name) trail = self.get_trail(trail_name)
return trail.arn, trail.get_insight_selectors() return trail.arn, trail.get_insight_selectors()

View File

@ -1,23 +1,23 @@
"""Handles incoming cloudtrail requests, invokes methods, returns responses.""" """Handles incoming cloudtrail requests, invokes methods, returns responses."""
import json import json
from typing import Any, Dict
from moto.core.responses import BaseResponse from moto.core.responses import BaseResponse
from .models import cloudtrail_backends from .models import cloudtrail_backends, CloudTrailBackend
from .exceptions import InvalidParameterCombinationException from .exceptions import InvalidParameterCombinationException
class CloudTrailResponse(BaseResponse): class CloudTrailResponse(BaseResponse):
"""Handler for CloudTrail requests and responses.""" """Handler for CloudTrail requests and responses."""
def __init__(self): def __init__(self) -> None:
super().__init__(service_name="cloudtrail") super().__init__(service_name="cloudtrail")
@property @property
def cloudtrail_backend(self): def cloudtrail_backend(self) -> CloudTrailBackend:
"""Return backend instance specific for this region.""" """Return backend instance specific for this region."""
return cloudtrail_backends[self.current_account][self.region] return cloudtrail_backends[self.current_account][self.region]
def create_trail(self): def create_trail(self) -> str:
name = self._get_param("Name") name = self._get_param("Name")
bucket_name = self._get_param("S3BucketName") bucket_name = self._get_param("S3BucketName")
is_global = self._get_bool_param("IncludeGlobalServiceEvents", True) is_global = self._get_bool_param("IncludeGlobalServiceEvents", True)
@ -50,43 +50,43 @@ class CloudTrailResponse(BaseResponse):
) )
return json.dumps(trail.description()) return json.dumps(trail.description())
def get_trail(self): def get_trail(self) -> str:
name = self._get_param("Name") name = self._get_param("Name")
trail = self.cloudtrail_backend.get_trail(name) trail = self.cloudtrail_backend.get_trail(name)
return json.dumps({"Trail": trail.description()}) return json.dumps({"Trail": trail.description()})
def get_trail_status(self): def get_trail_status(self) -> str:
name = self._get_param("Name") name = self._get_param("Name")
status = self.cloudtrail_backend.get_trail_status(name) status = self.cloudtrail_backend.get_trail_status(name)
return json.dumps(status.description()) return json.dumps(status.description())
def describe_trails(self): def describe_trails(self) -> str:
include_shadow_trails = self._get_bool_param("includeShadowTrails", True) include_shadow_trails = self._get_bool_param("includeShadowTrails", True)
trails = self.cloudtrail_backend.describe_trails(include_shadow_trails) trails = self.cloudtrail_backend.describe_trails(include_shadow_trails)
return json.dumps( return json.dumps(
{"trailList": [t.description(include_region=True) for t in trails]} {"trailList": [t.description(include_region=True) for t in trails]}
) )
def list_trails(self): def list_trails(self) -> str:
all_trails = self.cloudtrail_backend.list_trails() all_trails = self.cloudtrail_backend.list_trails()
return json.dumps({"Trails": [t.short() for t in all_trails]}) return json.dumps({"Trails": [t.short() for t in all_trails]})
def start_logging(self): def start_logging(self) -> str:
name = self._get_param("Name") name = self._get_param("Name")
self.cloudtrail_backend.start_logging(name) self.cloudtrail_backend.start_logging(name)
return json.dumps({}) return json.dumps({})
def stop_logging(self): def stop_logging(self) -> str:
name = self._get_param("Name") name = self._get_param("Name")
self.cloudtrail_backend.stop_logging(name) self.cloudtrail_backend.stop_logging(name)
return json.dumps({}) return json.dumps({})
def delete_trail(self): def delete_trail(self) -> str:
name = self._get_param("Name") name = self._get_param("Name")
self.cloudtrail_backend.delete_trail(name) self.cloudtrail_backend.delete_trail(name)
return json.dumps({}) return json.dumps({})
def update_trail(self): def update_trail(self) -> str:
name = self._get_param("Name") name = self._get_param("Name")
s3_bucket_name = self._get_param("S3BucketName") s3_bucket_name = self._get_param("S3BucketName")
s3_key_prefix = self._get_param("S3KeyPrefix") s3_key_prefix = self._get_param("S3KeyPrefix")
@ -113,7 +113,7 @@ class CloudTrailResponse(BaseResponse):
) )
return json.dumps(trail.description()) return json.dumps(trail.description())
def put_event_selectors(self): def put_event_selectors(self) -> str:
params = json.loads(self.body) params = json.loads(self.body)
trail_name = params.get("TrailName") trail_name = params.get("TrailName")
event_selectors = params.get("EventSelectors") event_selectors = params.get("EventSelectors")
@ -135,7 +135,7 @@ class CloudTrailResponse(BaseResponse):
) )
) )
def get_event_selectors(self): def get_event_selectors(self) -> str:
params = json.loads(self.body) params = json.loads(self.body)
trail_name = params.get("TrailName") trail_name = params.get("TrailName")
( (
@ -151,14 +151,14 @@ class CloudTrailResponse(BaseResponse):
) )
) )
def add_tags(self): def add_tags(self) -> str:
params = json.loads(self.body) params = json.loads(self.body)
resource_id = params.get("ResourceId") resource_id = params.get("ResourceId")
tags_list = params.get("TagsList") tags_list = params.get("TagsList")
self.cloudtrail_backend.add_tags(resource_id=resource_id, tags_list=tags_list) self.cloudtrail_backend.add_tags(resource_id=resource_id, tags_list=tags_list)
return json.dumps(dict()) return json.dumps(dict())
def remove_tags(self): def remove_tags(self) -> str:
resource_id = self._get_param("ResourceId") resource_id = self._get_param("ResourceId")
tags_list = self._get_param("TagsList") tags_list = self._get_param("TagsList")
self.cloudtrail_backend.remove_tags( self.cloudtrail_backend.remove_tags(
@ -166,7 +166,7 @@ class CloudTrailResponse(BaseResponse):
) )
return json.dumps(dict()) return json.dumps(dict())
def list_tags(self): def list_tags(self) -> str:
params = json.loads(self.body) params = json.loads(self.body)
resource_id_list = params.get("ResourceIdList") resource_id_list = params.get("ResourceIdList")
resource_tag_list = self.cloudtrail_backend.list_tags( resource_tag_list = self.cloudtrail_backend.list_tags(
@ -174,7 +174,7 @@ class CloudTrailResponse(BaseResponse):
) )
return json.dumps(dict(ResourceTagList=resource_tag_list)) return json.dumps(dict(ResourceTagList=resource_tag_list))
def put_insight_selectors(self): def put_insight_selectors(self) -> str:
trail_name = self._get_param("TrailName") trail_name = self._get_param("TrailName")
insight_selectors = self._get_param("InsightSelectors") insight_selectors = self._get_param("InsightSelectors")
trail_arn, insight_selectors = self.cloudtrail_backend.put_insight_selectors( trail_arn, insight_selectors = self.cloudtrail_backend.put_insight_selectors(
@ -182,12 +182,12 @@ class CloudTrailResponse(BaseResponse):
) )
return json.dumps(dict(TrailARN=trail_arn, InsightSelectors=insight_selectors)) return json.dumps(dict(TrailARN=trail_arn, InsightSelectors=insight_selectors))
def get_insight_selectors(self): def get_insight_selectors(self) -> str:
trail_name = self._get_param("TrailName") trail_name = self._get_param("TrailName")
trail_arn, insight_selectors = self.cloudtrail_backend.get_insight_selectors( trail_arn, insight_selectors = self.cloudtrail_backend.get_insight_selectors(
trail_name=trail_name trail_name=trail_name
) )
resp = {"TrailARN": trail_arn} resp: Dict[str, Any] = {"TrailARN": trail_arn}
if insight_selectors: if insight_selectors:
resp["InsightSelectors"] = insight_selectors resp["InsightSelectors"] = insight_selectors
return json.dumps(resp) return json.dumps(resp)

View File

@ -6,19 +6,19 @@ from moto.core.exceptions import JsonRESTError
class InvalidInputException(JsonRESTError): class InvalidInputException(JsonRESTError):
code = 400 code = 400
def __init__(self, message): def __init__(self, message: str):
super().__init__("InvalidInputException", message) super().__init__("InvalidInputException", message)
class ResourceNotFoundException(JsonRESTError): class ResourceNotFoundException(JsonRESTError):
code = 400 code = 400
def __init__(self, message): def __init__(self, message: str):
super().__init__("ResourceNotFoundException", message) super().__init__("ResourceNotFoundException", message)
class ResourceAlreadyExistsException(JsonRESTError): class ResourceAlreadyExistsException(JsonRESTError):
code = 400 code = 400
def __init__(self, message): def __init__(self, message: str):
super().__init__("ResourceAlreadyExistsException", message) super().__init__("ResourceAlreadyExistsException", message)

View File

@ -3,22 +3,23 @@ from moto.core.utils import iso_8601_datetime_with_milliseconds, BackendDict
from moto.moto_api._internal import mock_random from moto.moto_api._internal import mock_random
from collections import defaultdict from collections import defaultdict
from dateutil import parser from dateutil import parser
from typing import Any, Dict, List, Optional
import datetime import datetime
class CodeBuildProjectMetadata(BaseModel): class CodeBuildProjectMetadata(BaseModel):
def __init__( def __init__(
self, self,
account_id, account_id: str,
region_name, region_name: str,
project_name, project_name: str,
source_version, source_version: Optional[str],
artifacts, artifacts: Optional[Dict[str, Any]],
build_id, build_id: str,
service_role, service_role: str,
): ):
current_date = iso_8601_datetime_with_milliseconds(datetime.datetime.utcnow()) current_date = iso_8601_datetime_with_milliseconds(datetime.datetime.utcnow())
self.build_metadata = dict() self.build_metadata: Dict[str, Any] = dict()
self.build_metadata["id"] = build_id self.build_metadata["id"] = build_id
self.build_metadata[ self.build_metadata[
@ -90,16 +91,16 @@ class CodeBuildProjectMetadata(BaseModel):
class CodeBuild(BaseModel): class CodeBuild(BaseModel):
def __init__( def __init__(
self, self,
account_id, account_id: str,
region, region: str,
project_name, project_name: str,
project_source, project_source: Dict[str, Any],
artifacts, artifacts: Dict[str, Any],
environment, environment: Dict[str, Any],
serviceRole="some_role", serviceRole: str = "some_role",
): ):
current_date = iso_8601_datetime_with_milliseconds(datetime.datetime.utcnow()) current_date = iso_8601_datetime_with_milliseconds(datetime.datetime.utcnow())
self.project_metadata = dict() self.project_metadata: Dict[str, Any] = dict()
self.project_metadata["name"] = project_name self.project_metadata["name"] = project_name
self.project_metadata["arn"] = "arn:aws:codebuild:{0}:{1}:project/{2}".format( self.project_metadata["arn"] = "arn:aws:codebuild:{0}:{1}:project/{2}".format(
@ -127,16 +128,21 @@ class CodeBuild(BaseModel):
class CodeBuildBackend(BaseBackend): class CodeBuildBackend(BaseBackend):
def __init__(self, region_name, account_id): def __init__(self, region_name: str, account_id: str):
super().__init__(region_name, account_id) super().__init__(region_name, account_id)
self.codebuild_projects = dict() self.codebuild_projects: Dict[str, CodeBuild] = dict()
self.build_history = dict() self.build_history: Dict[str, List[str]] = dict()
self.build_metadata = dict() self.build_metadata: Dict[str, CodeBuildProjectMetadata] = dict()
self.build_metadata_history = defaultdict(list) self.build_metadata_history: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
def create_project( def create_project(
self, project_name, project_source, artifacts, environment, service_role self,
): project_name: str,
project_source: Dict[str, Any],
artifacts: Dict[str, Any],
environment: Dict[str, Any],
service_role: str,
) -> Dict[str, Any]:
# required in other functions that don't # required in other functions that don't
self.project_name = project_name self.project_name = project_name
self.service_role = service_role self.service_role = service_role
@ -156,7 +162,7 @@ class CodeBuildBackend(BaseBackend):
return self.codebuild_projects[project_name].project_metadata return self.codebuild_projects[project_name].project_metadata
def list_projects(self): def list_projects(self) -> List[str]:
projects = [] projects = []
@ -165,7 +171,12 @@ class CodeBuildBackend(BaseBackend):
return projects return projects
def start_build(self, project_name, source_version=None, artifact_override=None): def start_build(
self,
project_name: str,
source_version: Optional[str] = None,
artifact_override: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
build_id = "{0}:{1}".format(project_name, mock_random.uuid4()) build_id = "{0}:{1}".format(project_name, mock_random.uuid4())
@ -189,7 +200,7 @@ class CodeBuildBackend(BaseBackend):
return self.build_metadata[project_name].build_metadata return self.build_metadata[project_name].build_metadata
def _set_phases(self, phases): def _set_phases(self, phases: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
current_date = iso_8601_datetime_with_milliseconds(datetime.datetime.utcnow()) current_date = iso_8601_datetime_with_milliseconds(datetime.datetime.utcnow())
# No phaseStatus for QUEUED on first start # No phaseStatus for QUEUED on first start
for existing_phase in phases: for existing_phase in phases:
@ -209,7 +220,7 @@ class CodeBuildBackend(BaseBackend):
] ]
for status in statuses: for status in statuses:
phase = dict() phase: Dict[str, Any] = dict()
phase["phaseType"] = status phase["phaseType"] = status
phase["phaseStatus"] = "SUCCEEDED" phase["phaseStatus"] = "SUCCEEDED"
phase["startTime"] = current_date phase["startTime"] = current_date
@ -219,8 +230,8 @@ class CodeBuildBackend(BaseBackend):
return phases return phases
def batch_get_builds(self, ids): def batch_get_builds(self, ids: List[str]) -> List[Dict[str, Any]]:
batch_build_metadata = [] batch_build_metadata: List[Dict[str, Any]] = []
for metadata in self.build_metadata_history.values(): for metadata in self.build_metadata_history.values():
for build in metadata: for build in metadata:
@ -237,24 +248,24 @@ class CodeBuildBackend(BaseBackend):
return batch_build_metadata return batch_build_metadata
def list_builds_for_project(self, project_name): def list_builds_for_project(self, project_name: str) -> List[str]:
try: try:
return self.build_history[project_name] return self.build_history[project_name]
except KeyError: except KeyError:
return list() return list()
def list_builds(self): def list_builds(self) -> List[str]:
ids = [] ids = []
for build_ids in self.build_history.values(): for build_ids in self.build_history.values():
ids += build_ids ids += build_ids
return ids return ids
def delete_project(self, project_name): def delete_project(self, project_name: str) -> None:
self.build_metadata.pop(project_name, None) self.build_metadata.pop(project_name, None)
self.codebuild_projects.pop(project_name, None) self.codebuild_projects.pop(project_name, None)
def stop_build(self, build_id): def stop_build(self, build_id: str) -> Optional[Dict[str, Any]]: # type: ignore[return]
for metadata in self.build_metadata_history.values(): for metadata in self.build_metadata_history.values():
for build in metadata: for build in metadata:

View File

@ -1,5 +1,5 @@
from moto.core.responses import BaseResponse from moto.core.responses import BaseResponse
from .models import codebuild_backends from .models import codebuild_backends, CodeBuildBackend
from .exceptions import ( from .exceptions import (
InvalidInputException, InvalidInputException,
ResourceAlreadyExistsException, ResourceAlreadyExistsException,
@ -7,9 +7,10 @@ from .exceptions import (
) )
import json import json
import re import re
from typing import Any, Dict, List
def _validate_required_params_source(source): def _validate_required_params_source(source: Dict[str, Any]) -> None:
if source["type"] not in [ if source["type"] not in [
"BITBUCKET", "BITBUCKET",
"CODECOMMIT", "CODECOMMIT",
@ -28,14 +29,14 @@ def _validate_required_params_source(source):
raise InvalidInputException("Project source location is required") raise InvalidInputException("Project source location is required")
def _validate_required_params_service_role(account_id, service_role): def _validate_required_params_service_role(account_id: str, service_role: str) -> None:
if f"arn:aws:iam::{account_id}:role/service-role/" not in service_role: if f"arn:aws:iam::{account_id}:role/service-role/" not in service_role:
raise InvalidInputException( raise InvalidInputException(
"Invalid service role: Service role account ID does not match caller's account" "Invalid service role: Service role account ID does not match caller's account"
) )
def _validate_required_params_artifacts(artifacts): def _validate_required_params_artifacts(artifacts: Dict[str, Any]) -> None:
if artifacts["type"] not in ["CODEPIPELINE", "S3", "NO_ARTIFACTS"]: if artifacts["type"] not in ["CODEPIPELINE", "S3", "NO_ARTIFACTS"]:
raise InvalidInputException("Invalid type provided: Artifact type") raise InvalidInputException("Invalid type provided: Artifact type")
@ -49,7 +50,7 @@ def _validate_required_params_artifacts(artifacts):
raise InvalidInputException("Project source location is required") raise InvalidInputException("Project source location is required")
def _validate_required_params_environment(environment): def _validate_required_params_environment(environment: Dict[str, Any]) -> None:
if environment["type"] not in [ if environment["type"] not in [
"WINDOWS_CONTAINER", "WINDOWS_CONTAINER",
@ -72,7 +73,7 @@ def _validate_required_params_environment(environment):
) )
def _validate_required_params_project_name(name): def _validate_required_params_project_name(name: str) -> None:
if len(name) >= 150: if len(name) >= 150:
raise InvalidInputException( raise InvalidInputException(
"Only alphanumeric characters, dash, and underscore are supported" "Only alphanumeric characters, dash, and underscore are supported"
@ -84,7 +85,7 @@ def _validate_required_params_project_name(name):
) )
def _validate_required_params_id(build_id, build_ids): def _validate_required_params_id(build_id: str, build_ids: List[str]) -> None:
if ":" not in build_id: if ":" not in build_id:
raise InvalidInputException("Invalid build ID provided") raise InvalidInputException("Invalid build ID provided")
@ -94,10 +95,10 @@ def _validate_required_params_id(build_id, build_ids):
class CodeBuildResponse(BaseResponse): class CodeBuildResponse(BaseResponse):
@property @property
def codebuild_backend(self): def codebuild_backend(self) -> CodeBuildBackend:
return codebuild_backends[self.current_account][self.region] return codebuild_backends[self.current_account][self.region]
def list_builds_for_project(self): def list_builds_for_project(self) -> str:
_validate_required_params_project_name(self._get_param("projectName")) _validate_required_params_project_name(self._get_param("projectName"))
if ( if (
@ -116,7 +117,7 @@ class CodeBuildResponse(BaseResponse):
return json.dumps({"ids": ids}) return json.dumps({"ids": ids})
def create_project(self): def create_project(self) -> str:
_validate_required_params_source(self._get_param("source")) _validate_required_params_source(self._get_param("source"))
_validate_required_params_service_role( _validate_required_params_service_role(
self.current_account, self._get_param("serviceRole") self.current_account, self._get_param("serviceRole")
@ -142,11 +143,11 @@ class CodeBuildResponse(BaseResponse):
return json.dumps({"project": project_metadata}) return json.dumps({"project": project_metadata})
def list_projects(self): def list_projects(self) -> str:
project_metadata = self.codebuild_backend.list_projects() project_metadata = self.codebuild_backend.list_projects()
return json.dumps({"projects": project_metadata}) return json.dumps({"projects": project_metadata})
def start_build(self): def start_build(self) -> str:
_validate_required_params_project_name(self._get_param("projectName")) _validate_required_params_project_name(self._get_param("projectName"))
if ( if (
@ -166,7 +167,7 @@ class CodeBuildResponse(BaseResponse):
) )
return json.dumps({"build": metadata}) return json.dumps({"build": metadata})
def batch_get_builds(self): def batch_get_builds(self) -> str:
for build_id in self._get_param("ids"): for build_id in self._get_param("ids"):
if ":" not in build_id: if ":" not in build_id:
raise InvalidInputException("Invalid build ID provided") raise InvalidInputException("Invalid build ID provided")
@ -174,17 +175,17 @@ class CodeBuildResponse(BaseResponse):
metadata = self.codebuild_backend.batch_get_builds(self._get_param("ids")) metadata = self.codebuild_backend.batch_get_builds(self._get_param("ids"))
return json.dumps({"builds": metadata}) return json.dumps({"builds": metadata})
def list_builds(self): def list_builds(self) -> str:
ids = self.codebuild_backend.list_builds() ids = self.codebuild_backend.list_builds()
return json.dumps({"ids": ids}) return json.dumps({"ids": ids})
def delete_project(self): def delete_project(self) -> str:
_validate_required_params_project_name(self._get_param("name")) _validate_required_params_project_name(self._get_param("name"))
self.codebuild_backend.delete_project(self._get_param("name")) self.codebuild_backend.delete_project(self._get_param("name"))
return return "{}"
def stop_build(self): def stop_build(self) -> str:
_validate_required_params_id( _validate_required_params_id(
self._get_param("id"), self.codebuild_backend.list_builds() self._get_param("id"), self.codebuild_backend.list_builds()
) )

View File

@ -18,7 +18,7 @@ disable = W,C,R,E
enable = anomalous-backslash-in-string, arguments-renamed, dangerous-default-value, deprecated-module, function-redefined, import-self, redefined-builtin, redefined-outer-name, reimported, pointless-statement, super-with-arguments, unused-argument, unused-import, unused-variable, useless-else-on-loop, wildcard-import enable = anomalous-backslash-in-string, arguments-renamed, dangerous-default-value, deprecated-module, function-redefined, import-self, redefined-builtin, redefined-outer-name, reimported, pointless-statement, super-with-arguments, unused-argument, unused-import, unused-variable, useless-else-on-loop, wildcard-import
[mypy] [mypy]
files= moto/a*,moto/b*,moto/ce,moto/cloudformation,moto/cloudfront files= moto/a*,moto/b*,moto/ce,moto/cloudformation,moto/cloudfront,moto/cloudtrail,moto/codebuild
show_column_numbers=True show_column_numbers=True
show_error_codes = True show_error_codes = True
disable_error_code=abstract disable_error_code=abstract