From cf5e7b750f272120c463635764f0af6f33f8e5f3 Mon Sep 17 00:00:00 2001 From: Bert Blommers Date: Sun, 30 Oct 2022 19:55:31 -0100 Subject: [PATCH] TechDebt: MyPy CloudTrail and CodeBuild (#5621) --- moto/cloudtrail/exceptions.py | 20 ++-- moto/cloudtrail/models.py | 203 ++++++++++++++++++---------------- moto/cloudtrail/responses.py | 42 +++---- moto/codebuild/exceptions.py | 6 +- moto/codebuild/models.py | 77 +++++++------ moto/codebuild/responses.py | 35 +++--- setup.cfg | 2 +- 7 files changed, 205 insertions(+), 180 deletions(-) diff --git a/moto/cloudtrail/exceptions.py b/moto/cloudtrail/exceptions.py index 5c26a47e3..305118d70 100644 --- a/moto/cloudtrail/exceptions.py +++ b/moto/cloudtrail/exceptions.py @@ -5,28 +5,28 @@ from moto.core.exceptions import JsonRESTError class InvalidParameterCombinationException(JsonRESTError): code = 400 - def __init__(self, message): + def __init__(self, message: str): super().__init__("InvalidParameterCombinationException", message) class S3BucketDoesNotExistException(JsonRESTError): code = 400 - def __init__(self, message): + def __init__(self, message: str): super().__init__("S3BucketDoesNotExistException", message) class InsufficientSnsTopicPolicyException(JsonRESTError): code = 400 - def __init__(self, message): + def __init__(self, message: str): super().__init__("InsufficientSnsTopicPolicyException", message) class TrailNotFoundException(JsonRESTError): code = 400 - def __init__(self, account_id, name): + def __init__(self, account_id: str, name: str): super().__init__( "TrailNotFoundException", f"Unknown trail: {name} for the user: {account_id}", @@ -36,36 +36,36 @@ class TrailNotFoundException(JsonRESTError): class InvalidTrailNameException(JsonRESTError): code = 400 - def __init__(self, message): + def __init__(self, message: str): super().__init__("InvalidTrailNameException", message) class TrailNameTooShort(InvalidTrailNameException): - def __init__(self, actual_length): + def __init__(self, actual_length: int): super().__init__( f"Trail name too short. Minimum allowed length: 3 characters. Specified name length: {actual_length} characters." ) class TrailNameTooLong(InvalidTrailNameException): - def __init__(self, actual_length): + def __init__(self, actual_length: int): super().__init__( f"Trail name too long. Maximum allowed length: 128 characters. Specified name length: {actual_length} characters." ) class TrailNameNotStartingCorrectly(InvalidTrailNameException): - def __init__(self): + def __init__(self) -> None: super().__init__("Trail name must starts with a letter or number.") class TrailNameNotEndingCorrectly(InvalidTrailNameException): - def __init__(self): + def __init__(self) -> None: super().__init__("Trail name must ends with a letter or number.") class TrailNameInvalidChars(InvalidTrailNameException): - def __init__(self): + def __init__(self) -> None: super().__init__( "Trail name or ARN can only contain uppercase letters, lowercase letters, numbers, periods (.), hyphens (-), and underscores (_)." ) diff --git a/moto/cloudtrail/models.py b/moto/cloudtrail/models.py index ca7fe8916..0e3343d83 100644 --- a/moto/cloudtrail/models.py +++ b/moto/cloudtrail/models.py @@ -2,6 +2,7 @@ import re import time from datetime import datetime +from typing import Any, Dict, List, Optional, Iterable, Tuple from moto.core import BaseBackend, BaseModel from moto.core.utils import iso_8601_datetime_without_milliseconds, BackendDict from moto.utilities.tagging_service import TaggingService @@ -17,20 +18,19 @@ from .exceptions import ( ) -def datetime2int(date): +def datetime2int(date: datetime) -> int: return int(time.mktime(date.timetuple())) -class TrailStatus(object): - def __init__(self): +class TrailStatus: + def __init__(self) -> None: self.is_logging = False - self.latest_delivery_time = "" - self.latest_delivery_attempt = "" - self.start_logging_time = None - self.started = None - self.stopped = None + self.latest_delivery_time: Optional[int] = None + self.latest_delivery_attempt: Optional[str] = "" + self.started: Optional[datetime] = None + self.stopped: Optional[datetime] = None - def start_logging(self): + def start_logging(self) -> None: self.is_logging = True self.started = datetime.utcnow() self.latest_delivery_time = datetime2int(datetime.utcnow()) @@ -38,17 +38,17 @@ class TrailStatus(object): datetime.utcnow() ) - def stop_logging(self): + def stop_logging(self) -> None: self.is_logging = False self.stopped = datetime.utcnow() - def description(self): + def description(self) -> Dict[str, Any]: if self.is_logging: self.latest_delivery_time = datetime2int(datetime.utcnow()) self.latest_delivery_attempt = iso_8601_datetime_without_milliseconds( datetime.utcnow() ) - desc = { + desc: Dict[str, Any] = { "IsLogging": self.is_logging, "LatestDeliveryAttemptTime": self.latest_delivery_attempt, "LatestNotificationAttemptTime": "", @@ -74,19 +74,19 @@ class TrailStatus(object): class Trail(BaseModel): def __init__( self, - account_id, - region_name, - trail_name, - bucket_name, - s3_key_prefix, - sns_topic_name, - is_global, - is_multi_region, - log_validation, - is_org_trail, - cw_log_group_arn, - cw_role_arn, - kms_key_id, + account_id: str, + region_name: str, + trail_name: str, + bucket_name: str, + s3_key_prefix: str, + sns_topic_name: str, + is_global: bool, + is_multi_region: bool, + log_validation: bool, + is_org_trail: bool, + cw_log_group_arn: str, + cw_role_arn: str, + kms_key_id: str, ): self.account_id = account_id self.region_name = region_name @@ -105,21 +105,21 @@ class Trail(BaseModel): self.check_bucket_exists() self.check_topic_exists() self.status = TrailStatus() - self.event_selectors = list() - self.advanced_event_selectors = list() - self.insight_selectors = list() + self.event_selectors: List[Dict[str, Any]] = list() + self.advanced_event_selectors: List[Dict[str, Any]] = list() + self.insight_selectors: List[Dict[str, str]] = list() @property - def arn(self): + def arn(self) -> str: return f"arn:aws:cloudtrail:{self.region_name}:{self.account_id}:trail/{self.trail_name}" @property - def topic_arn(self): + def topic_arn(self) -> Optional[str]: if self.sns_topic_name: return f"arn:aws:sns:{self.region_name}:{self.account_id}:{self.sns_topic_name}" return None - def check_name(self): + def check_name(self) -> None: if len(self.trail_name) < 3: raise TrailNameTooShort(actual_length=len(self.trail_name)) if len(self.trail_name) > 128: @@ -131,7 +131,7 @@ class Trail(BaseModel): if not re.match(r"^[.\-_0-9a-zA-Z]+$", self.trail_name): raise TrailNameInvalidChars() - def check_bucket_exists(self): + def check_bucket_exists(self) -> None: from moto.s3.models import s3_backends try: @@ -141,7 +141,7 @@ class Trail(BaseModel): f"S3 bucket {self.bucket_name} does not exist!" ) - def check_topic_exists(self): + def check_topic_exists(self) -> None: if self.sns_topic_name: from moto.sns import sns_backends @@ -153,41 +153,45 @@ class Trail(BaseModel): "SNS Topic does not exist or the topic policy is incorrect!" ) - def start_logging(self): + def start_logging(self) -> None: self.status.start_logging() - def stop_logging(self): + def stop_logging(self) -> None: self.status.stop_logging() - def put_event_selectors(self, event_selectors, advanced_event_selectors): + def put_event_selectors( + self, + event_selectors: List[Dict[str, Any]], + advanced_event_selectors: List[Dict[str, Any]], + ) -> None: if event_selectors: self.event_selectors = event_selectors elif advanced_event_selectors: self.event_selectors = [] self.advanced_event_selectors = advanced_event_selectors - def get_event_selectors(self): + def get_event_selectors(self) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]: return self.event_selectors, self.advanced_event_selectors - def put_insight_selectors(self, insight_selectors): + def put_insight_selectors(self, insight_selectors: List[Dict[str, str]]) -> None: self.insight_selectors.extend(insight_selectors) - def get_insight_selectors(self): + def get_insight_selectors(self) -> List[Dict[str, str]]: return self.insight_selectors def update( self, - s3_bucket_name, - s3_key_prefix, - sns_topic_name, - include_global_service_events, - is_multi_region_trail, - enable_log_file_validation, - is_organization_trail, - cw_log_group_arn, - cw_role_arn, - kms_key_id, - ): + s3_bucket_name: Optional[str], + s3_key_prefix: Optional[str], + sns_topic_name: Optional[str], + include_global_service_events: Optional[bool], + is_multi_region_trail: Optional[bool], + enable_log_file_validation: Optional[bool], + is_organization_trail: Optional[bool], + cw_log_group_arn: Optional[str], + cw_role_arn: Optional[str], + kms_key_id: Optional[str], + ) -> None: if s3_bucket_name is not None: self.bucket_name = s3_bucket_name if s3_key_prefix is not None: @@ -209,14 +213,14 @@ class Trail(BaseModel): if kms_key_id is not None: self.kms_key_id = kms_key_id - def short(self): + def short(self) -> Dict[str, str]: return { "Name": self.trail_name, "TrailARN": self.arn, "HomeRegion": self.region_name, } - def description(self, include_region=False): + def description(self, include_region: bool = False) -> Dict[str, Any]: desc = { "Name": self.trail_name, "S3BucketName": self.bucket_name, @@ -244,26 +248,26 @@ class Trail(BaseModel): class CloudTrailBackend(BaseBackend): """Implementation of CloudTrail APIs.""" - def __init__(self, region_name, account_id): + def __init__(self, region_name: str, account_id: str): super().__init__(region_name, account_id) - self.trails = dict() + self.trails: Dict[str, Trail] = dict() self.tagging_service = TaggingService(tag_name="TagsList") def create_trail( self, - name, - bucket_name, - s3_key_prefix, - sns_topic_name, - is_global, - is_multi_region, - log_validation, - is_org_trail, - cw_log_group_arn, - cw_role_arn, - kms_key_id, - tags_list, - ): + name: str, + bucket_name: str, + s3_key_prefix: str, + sns_topic_name: str, + is_global: bool, + is_multi_region: bool, + log_validation: bool, + is_org_trail: bool, + cw_log_group_arn: str, + cw_role_arn: str, + kms_key_id: str, + tags_list: List[Dict[str, str]], + ) -> Trail: trail = Trail( self.account_id, self.region_name, @@ -283,7 +287,7 @@ class CloudTrailBackend(BaseBackend): self.tagging_service.tag_resource(trail.arn, tags_list) return trail - def get_trail(self, name_or_arn): + def get_trail(self, name_or_arn: str) -> Trail: if len(name_or_arn) < 3: raise TrailNameTooShort(actual_length=len(name_or_arn)) if name_or_arn in self.trails: @@ -293,7 +297,7 @@ class CloudTrailBackend(BaseBackend): return trail raise TrailNotFoundException(account_id=self.account_id, name=name_or_arn) - def get_trail_status(self, name): + def get_trail_status(self, name: str) -> TrailStatus: if len(name) < 3: raise TrailNameTooShort(actual_length=len(name)) trail_name = next( @@ -313,7 +317,7 @@ class CloudTrailBackend(BaseBackend): trail = self.trails[trail_name] return trail.status - def describe_trails(self, include_shadow_trails): + def describe_trails(self, include_shadow_trails: bool) -> Iterable[Trail]: all_trails = [] if include_shadow_trails: current_account = cloudtrail_backends[self.account_id] @@ -325,35 +329,35 @@ class CloudTrailBackend(BaseBackend): all_trails.extend(self.trails.values()) return all_trails - def list_trails(self): + def list_trails(self) -> Iterable[Trail]: return self.describe_trails(include_shadow_trails=True) - def start_logging(self, name): + def start_logging(self, name: str) -> None: trail = self.trails[name] trail.start_logging() - def stop_logging(self, name): + def stop_logging(self, name: str) -> None: trail = self.trails[name] trail.stop_logging() - def delete_trail(self, name): + def delete_trail(self, name: str) -> None: if name in self.trails: del self.trails[name] def update_trail( self, - name, - s3_bucket_name, - s3_key_prefix, - sns_topic_name, - include_global_service_events, - is_multi_region_trail, - enable_log_file_validation, - is_organization_trail, - cw_log_group_arn, - cw_role_arn, - kms_key_id, - ): + name: str, + s3_bucket_name: str, + s3_key_prefix: str, + sns_topic_name: str, + include_global_service_events: bool, + is_multi_region_trail: bool, + enable_log_file_validation: bool, + is_organization_trail: bool, + cw_log_group_arn: str, + cw_role_arn: str, + kms_key_id: str, + ) -> Trail: trail = self.get_trail(name_or_arn=name) trail.update( s3_bucket_name=s3_bucket_name, @@ -370,41 +374,50 @@ class CloudTrailBackend(BaseBackend): return trail def put_event_selectors( - self, trail_name, event_selectors, advanced_event_selectors - ): + self, + trail_name: str, + event_selectors: List[Dict[str, Any]], + advanced_event_selectors: List[Dict[str, Any]], + ) -> Tuple[str, List[Dict[str, Any]], List[Dict[str, Any]]]: trail = self.get_trail(trail_name) trail.put_event_selectors(event_selectors, advanced_event_selectors) trail_arn = trail.arn return trail_arn, event_selectors, advanced_event_selectors - def get_event_selectors(self, trail_name): + def get_event_selectors( + self, trail_name: str + ) -> Tuple[str, List[Dict[str, Any]], List[Dict[str, Any]]]: trail = self.get_trail(trail_name) event_selectors, advanced_event_selectors = trail.get_event_selectors() return trail.arn, event_selectors, advanced_event_selectors - def add_tags(self, resource_id, tags_list): + def add_tags(self, resource_id: str, tags_list: List[Dict[str, str]]) -> None: self.tagging_service.tag_resource(resource_id, tags_list) - def remove_tags(self, resource_id, tags_list): + def remove_tags(self, resource_id: str, tags_list: List[Dict[str, str]]) -> None: self.tagging_service.untag_resource_using_tags(resource_id, tags_list) - def list_tags(self, resource_id_list): + def list_tags(self, resource_id_list: List[str]) -> List[Dict[str, Any]]: """ Pagination is not yet implemented """ - resp = [{"ResourceId": r_id} for r_id in resource_id_list] + resp: List[Dict[str, Any]] = [{"ResourceId": r_id} for r_id in resource_id_list] for item in resp: item["TagsList"] = self.tagging_service.list_tags_for_resource( item["ResourceId"] )["TagsList"] return resp - def put_insight_selectors(self, trail_name, insight_selectors): + def put_insight_selectors( + self, trail_name: str, insight_selectors: List[Dict[str, str]] + ) -> Tuple[str, List[Dict[str, str]]]: trail = self.get_trail(trail_name) trail.put_insight_selectors(insight_selectors) return trail.arn, insight_selectors - def get_insight_selectors(self, trail_name): + def get_insight_selectors( + self, trail_name: str + ) -> Tuple[str, List[Dict[str, str]]]: trail = self.get_trail(trail_name) return trail.arn, trail.get_insight_selectors() diff --git a/moto/cloudtrail/responses.py b/moto/cloudtrail/responses.py index a383238dd..fcdf2a675 100644 --- a/moto/cloudtrail/responses.py +++ b/moto/cloudtrail/responses.py @@ -1,23 +1,23 @@ """Handles incoming cloudtrail requests, invokes methods, returns responses.""" import json - +from typing import Any, Dict from moto.core.responses import BaseResponse -from .models import cloudtrail_backends +from .models import cloudtrail_backends, CloudTrailBackend from .exceptions import InvalidParameterCombinationException class CloudTrailResponse(BaseResponse): """Handler for CloudTrail requests and responses.""" - def __init__(self): + def __init__(self) -> None: super().__init__(service_name="cloudtrail") @property - def cloudtrail_backend(self): + def cloudtrail_backend(self) -> CloudTrailBackend: """Return backend instance specific for this region.""" return cloudtrail_backends[self.current_account][self.region] - def create_trail(self): + def create_trail(self) -> str: name = self._get_param("Name") bucket_name = self._get_param("S3BucketName") is_global = self._get_bool_param("IncludeGlobalServiceEvents", True) @@ -50,43 +50,43 @@ class CloudTrailResponse(BaseResponse): ) return json.dumps(trail.description()) - def get_trail(self): + def get_trail(self) -> str: name = self._get_param("Name") trail = self.cloudtrail_backend.get_trail(name) return json.dumps({"Trail": trail.description()}) - def get_trail_status(self): + def get_trail_status(self) -> str: name = self._get_param("Name") status = self.cloudtrail_backend.get_trail_status(name) return json.dumps(status.description()) - def describe_trails(self): + def describe_trails(self) -> str: include_shadow_trails = self._get_bool_param("includeShadowTrails", True) trails = self.cloudtrail_backend.describe_trails(include_shadow_trails) return json.dumps( {"trailList": [t.description(include_region=True) for t in trails]} ) - def list_trails(self): + def list_trails(self) -> str: all_trails = self.cloudtrail_backend.list_trails() return json.dumps({"Trails": [t.short() for t in all_trails]}) - def start_logging(self): + def start_logging(self) -> str: name = self._get_param("Name") self.cloudtrail_backend.start_logging(name) return json.dumps({}) - def stop_logging(self): + def stop_logging(self) -> str: name = self._get_param("Name") self.cloudtrail_backend.stop_logging(name) return json.dumps({}) - def delete_trail(self): + def delete_trail(self) -> str: name = self._get_param("Name") self.cloudtrail_backend.delete_trail(name) return json.dumps({}) - def update_trail(self): + def update_trail(self) -> str: name = self._get_param("Name") s3_bucket_name = self._get_param("S3BucketName") s3_key_prefix = self._get_param("S3KeyPrefix") @@ -113,7 +113,7 @@ class CloudTrailResponse(BaseResponse): ) return json.dumps(trail.description()) - def put_event_selectors(self): + def put_event_selectors(self) -> str: params = json.loads(self.body) trail_name = params.get("TrailName") event_selectors = params.get("EventSelectors") @@ -135,7 +135,7 @@ class CloudTrailResponse(BaseResponse): ) ) - def get_event_selectors(self): + def get_event_selectors(self) -> str: params = json.loads(self.body) trail_name = params.get("TrailName") ( @@ -151,14 +151,14 @@ class CloudTrailResponse(BaseResponse): ) ) - def add_tags(self): + def add_tags(self) -> str: params = json.loads(self.body) resource_id = params.get("ResourceId") tags_list = params.get("TagsList") self.cloudtrail_backend.add_tags(resource_id=resource_id, tags_list=tags_list) return json.dumps(dict()) - def remove_tags(self): + def remove_tags(self) -> str: resource_id = self._get_param("ResourceId") tags_list = self._get_param("TagsList") self.cloudtrail_backend.remove_tags( @@ -166,7 +166,7 @@ class CloudTrailResponse(BaseResponse): ) return json.dumps(dict()) - def list_tags(self): + def list_tags(self) -> str: params = json.loads(self.body) resource_id_list = params.get("ResourceIdList") resource_tag_list = self.cloudtrail_backend.list_tags( @@ -174,7 +174,7 @@ class CloudTrailResponse(BaseResponse): ) return json.dumps(dict(ResourceTagList=resource_tag_list)) - def put_insight_selectors(self): + def put_insight_selectors(self) -> str: trail_name = self._get_param("TrailName") insight_selectors = self._get_param("InsightSelectors") trail_arn, insight_selectors = self.cloudtrail_backend.put_insight_selectors( @@ -182,12 +182,12 @@ class CloudTrailResponse(BaseResponse): ) return json.dumps(dict(TrailARN=trail_arn, InsightSelectors=insight_selectors)) - def get_insight_selectors(self): + def get_insight_selectors(self) -> str: trail_name = self._get_param("TrailName") trail_arn, insight_selectors = self.cloudtrail_backend.get_insight_selectors( trail_name=trail_name ) - resp = {"TrailARN": trail_arn} + resp: Dict[str, Any] = {"TrailARN": trail_arn} if insight_selectors: resp["InsightSelectors"] = insight_selectors return json.dumps(resp) diff --git a/moto/codebuild/exceptions.py b/moto/codebuild/exceptions.py index 84bfed944..e980afaa0 100644 --- a/moto/codebuild/exceptions.py +++ b/moto/codebuild/exceptions.py @@ -6,19 +6,19 @@ from moto.core.exceptions import JsonRESTError class InvalidInputException(JsonRESTError): code = 400 - def __init__(self, message): + def __init__(self, message: str): super().__init__("InvalidInputException", message) class ResourceNotFoundException(JsonRESTError): code = 400 - def __init__(self, message): + def __init__(self, message: str): super().__init__("ResourceNotFoundException", message) class ResourceAlreadyExistsException(JsonRESTError): code = 400 - def __init__(self, message): + def __init__(self, message: str): super().__init__("ResourceAlreadyExistsException", message) diff --git a/moto/codebuild/models.py b/moto/codebuild/models.py index 2f4635267..5565ad294 100644 --- a/moto/codebuild/models.py +++ b/moto/codebuild/models.py @@ -3,22 +3,23 @@ from moto.core.utils import iso_8601_datetime_with_milliseconds, BackendDict from moto.moto_api._internal import mock_random from collections import defaultdict from dateutil import parser +from typing import Any, Dict, List, Optional import datetime class CodeBuildProjectMetadata(BaseModel): def __init__( self, - account_id, - region_name, - project_name, - source_version, - artifacts, - build_id, - service_role, + account_id: str, + region_name: str, + project_name: str, + source_version: Optional[str], + artifacts: Optional[Dict[str, Any]], + build_id: str, + service_role: str, ): current_date = iso_8601_datetime_with_milliseconds(datetime.datetime.utcnow()) - self.build_metadata = dict() + self.build_metadata: Dict[str, Any] = dict() self.build_metadata["id"] = build_id self.build_metadata[ @@ -90,16 +91,16 @@ class CodeBuildProjectMetadata(BaseModel): class CodeBuild(BaseModel): def __init__( self, - account_id, - region, - project_name, - project_source, - artifacts, - environment, - serviceRole="some_role", + account_id: str, + region: str, + project_name: str, + project_source: Dict[str, Any], + artifacts: Dict[str, Any], + environment: Dict[str, Any], + serviceRole: str = "some_role", ): current_date = iso_8601_datetime_with_milliseconds(datetime.datetime.utcnow()) - self.project_metadata = dict() + self.project_metadata: Dict[str, Any] = dict() self.project_metadata["name"] = project_name self.project_metadata["arn"] = "arn:aws:codebuild:{0}:{1}:project/{2}".format( @@ -127,16 +128,21 @@ class CodeBuild(BaseModel): class CodeBuildBackend(BaseBackend): - def __init__(self, region_name, account_id): + def __init__(self, region_name: str, account_id: str): super().__init__(region_name, account_id) - self.codebuild_projects = dict() - self.build_history = dict() - self.build_metadata = dict() - self.build_metadata_history = defaultdict(list) + self.codebuild_projects: Dict[str, CodeBuild] = dict() + self.build_history: Dict[str, List[str]] = dict() + self.build_metadata: Dict[str, CodeBuildProjectMetadata] = dict() + self.build_metadata_history: Dict[str, List[Dict[str, Any]]] = defaultdict(list) def create_project( - self, project_name, project_source, artifacts, environment, service_role - ): + self, + project_name: str, + project_source: Dict[str, Any], + artifacts: Dict[str, Any], + environment: Dict[str, Any], + service_role: str, + ) -> Dict[str, Any]: # required in other functions that don't self.project_name = project_name self.service_role = service_role @@ -156,7 +162,7 @@ class CodeBuildBackend(BaseBackend): return self.codebuild_projects[project_name].project_metadata - def list_projects(self): + def list_projects(self) -> List[str]: projects = [] @@ -165,7 +171,12 @@ class CodeBuildBackend(BaseBackend): return projects - def start_build(self, project_name, source_version=None, artifact_override=None): + def start_build( + self, + project_name: str, + source_version: Optional[str] = None, + artifact_override: Optional[Dict[str, Any]] = None, + ) -> Dict[str, Any]: build_id = "{0}:{1}".format(project_name, mock_random.uuid4()) @@ -189,7 +200,7 @@ class CodeBuildBackend(BaseBackend): return self.build_metadata[project_name].build_metadata - def _set_phases(self, phases): + def _set_phases(self, phases: List[Dict[str, Any]]) -> List[Dict[str, Any]]: current_date = iso_8601_datetime_with_milliseconds(datetime.datetime.utcnow()) # No phaseStatus for QUEUED on first start for existing_phase in phases: @@ -209,7 +220,7 @@ class CodeBuildBackend(BaseBackend): ] for status in statuses: - phase = dict() + phase: Dict[str, Any] = dict() phase["phaseType"] = status phase["phaseStatus"] = "SUCCEEDED" phase["startTime"] = current_date @@ -219,8 +230,8 @@ class CodeBuildBackend(BaseBackend): return phases - def batch_get_builds(self, ids): - batch_build_metadata = [] + def batch_get_builds(self, ids: List[str]) -> List[Dict[str, Any]]: + batch_build_metadata: List[Dict[str, Any]] = [] for metadata in self.build_metadata_history.values(): for build in metadata: @@ -237,24 +248,24 @@ class CodeBuildBackend(BaseBackend): return batch_build_metadata - def list_builds_for_project(self, project_name): + def list_builds_for_project(self, project_name: str) -> List[str]: try: return self.build_history[project_name] except KeyError: return list() - def list_builds(self): + def list_builds(self) -> List[str]: ids = [] for build_ids in self.build_history.values(): ids += build_ids return ids - def delete_project(self, project_name): + def delete_project(self, project_name: str) -> None: self.build_metadata.pop(project_name, None) self.codebuild_projects.pop(project_name, None) - def stop_build(self, build_id): + def stop_build(self, build_id: str) -> Optional[Dict[str, Any]]: # type: ignore[return] for metadata in self.build_metadata_history.values(): for build in metadata: diff --git a/moto/codebuild/responses.py b/moto/codebuild/responses.py index ad60c1fb5..3f80f0726 100644 --- a/moto/codebuild/responses.py +++ b/moto/codebuild/responses.py @@ -1,5 +1,5 @@ from moto.core.responses import BaseResponse -from .models import codebuild_backends +from .models import codebuild_backends, CodeBuildBackend from .exceptions import ( InvalidInputException, ResourceAlreadyExistsException, @@ -7,9 +7,10 @@ from .exceptions import ( ) import json import re +from typing import Any, Dict, List -def _validate_required_params_source(source): +def _validate_required_params_source(source: Dict[str, Any]) -> None: if source["type"] not in [ "BITBUCKET", "CODECOMMIT", @@ -28,14 +29,14 @@ def _validate_required_params_source(source): raise InvalidInputException("Project source location is required") -def _validate_required_params_service_role(account_id, service_role): +def _validate_required_params_service_role(account_id: str, service_role: str) -> None: if f"arn:aws:iam::{account_id}:role/service-role/" not in service_role: raise InvalidInputException( "Invalid service role: Service role account ID does not match caller's account" ) -def _validate_required_params_artifacts(artifacts): +def _validate_required_params_artifacts(artifacts: Dict[str, Any]) -> None: if artifacts["type"] not in ["CODEPIPELINE", "S3", "NO_ARTIFACTS"]: raise InvalidInputException("Invalid type provided: Artifact type") @@ -49,7 +50,7 @@ def _validate_required_params_artifacts(artifacts): raise InvalidInputException("Project source location is required") -def _validate_required_params_environment(environment): +def _validate_required_params_environment(environment: Dict[str, Any]) -> None: if environment["type"] not in [ "WINDOWS_CONTAINER", @@ -72,7 +73,7 @@ def _validate_required_params_environment(environment): ) -def _validate_required_params_project_name(name): +def _validate_required_params_project_name(name: str) -> None: if len(name) >= 150: raise InvalidInputException( "Only alphanumeric characters, dash, and underscore are supported" @@ -84,7 +85,7 @@ def _validate_required_params_project_name(name): ) -def _validate_required_params_id(build_id, build_ids): +def _validate_required_params_id(build_id: str, build_ids: List[str]) -> None: if ":" not in build_id: raise InvalidInputException("Invalid build ID provided") @@ -94,10 +95,10 @@ def _validate_required_params_id(build_id, build_ids): class CodeBuildResponse(BaseResponse): @property - def codebuild_backend(self): + def codebuild_backend(self) -> CodeBuildBackend: return codebuild_backends[self.current_account][self.region] - def list_builds_for_project(self): + def list_builds_for_project(self) -> str: _validate_required_params_project_name(self._get_param("projectName")) if ( @@ -116,7 +117,7 @@ class CodeBuildResponse(BaseResponse): return json.dumps({"ids": ids}) - def create_project(self): + def create_project(self) -> str: _validate_required_params_source(self._get_param("source")) _validate_required_params_service_role( self.current_account, self._get_param("serviceRole") @@ -142,11 +143,11 @@ class CodeBuildResponse(BaseResponse): return json.dumps({"project": project_metadata}) - def list_projects(self): + def list_projects(self) -> str: project_metadata = self.codebuild_backend.list_projects() return json.dumps({"projects": project_metadata}) - def start_build(self): + def start_build(self) -> str: _validate_required_params_project_name(self._get_param("projectName")) if ( @@ -166,7 +167,7 @@ class CodeBuildResponse(BaseResponse): ) return json.dumps({"build": metadata}) - def batch_get_builds(self): + def batch_get_builds(self) -> str: for build_id in self._get_param("ids"): if ":" not in build_id: raise InvalidInputException("Invalid build ID provided") @@ -174,17 +175,17 @@ class CodeBuildResponse(BaseResponse): metadata = self.codebuild_backend.batch_get_builds(self._get_param("ids")) return json.dumps({"builds": metadata}) - def list_builds(self): + def list_builds(self) -> str: ids = self.codebuild_backend.list_builds() return json.dumps({"ids": ids}) - def delete_project(self): + def delete_project(self) -> str: _validate_required_params_project_name(self._get_param("name")) self.codebuild_backend.delete_project(self._get_param("name")) - return + return "{}" - def stop_build(self): + def stop_build(self) -> str: _validate_required_params_id( self._get_param("id"), self.codebuild_backend.list_builds() ) diff --git a/setup.cfg b/setup.cfg index 0085dd39e..8eabe4793 100644 --- a/setup.cfg +++ b/setup.cfg @@ -18,7 +18,7 @@ disable = W,C,R,E enable = anomalous-backslash-in-string, arguments-renamed, dangerous-default-value, deprecated-module, function-redefined, import-self, redefined-builtin, redefined-outer-name, reimported, pointless-statement, super-with-arguments, unused-argument, unused-import, unused-variable, useless-else-on-loop, wildcard-import [mypy] -files= moto/a*,moto/b*,moto/ce,moto/cloudformation,moto/cloudfront +files= moto/a*,moto/b*,moto/ce,moto/cloudformation,moto/cloudfront,moto/cloudtrail,moto/codebuild show_column_numbers=True show_error_codes = True disable_error_code=abstract