diff --git a/moto/eks/exceptions.py b/moto/eks/exceptions.py index f25a5fcd6..61ae17c3b 100644 --- a/moto/eks/exceptions.py +++ b/moto/eks/exceptions.py @@ -1,16 +1,17 @@ import json +from typing import Any, Dict, Tuple from moto.core.exceptions import AWSError class EKSError(AWSError): - def __init__(self, **kwargs): - super(AWSError, self).__init__(error_type=self.TYPE, message="") + def __init__(self, **kwargs: Any): + super(AWSError, self).__init__(error_type=self.TYPE, message="") # type: ignore self.description = json.dumps(kwargs) self.headers = {"status": self.STATUS, "x-amzn-ErrorType": self.TYPE} self.code = self.STATUS - def response(self): + def response(self) -> Tuple[int, Dict[str, Any], str]: # type: ignore[override] return self.STATUS, self.headers, self.description diff --git a/moto/eks/models.py b/moto/eks/models.py index 30247c069..74df856df 100644 --- a/moto/eks/models.py +++ b/moto/eks/models.py @@ -1,4 +1,5 @@ from datetime import datetime +from typing import Any, Dict, List, Optional, Tuple, Iterator from moto.core import BaseBackend, BackendDict from moto.core.utils import iso_8601_datetime_without_milliseconds @@ -52,7 +53,7 @@ DEFAULT_AMI_TYPE = "AL2_x86_64" DEFAULT_CAPACITY_TYPE = "ON_DEMAND" DEFAULT_DISK_SIZE = "20" DEFAULT_INSTANCE_TYPES = ["t3.medium"] -DEFAULT_NODEGROUP_HEALTH = {"issues": []} +DEFAULT_NODEGROUP_HEALTH: Dict[str, Any] = {"issues": []} DEFAULT_RELEASE_VERSION = "1.19.8-20210414" DEFAULT_REMOTE_ACCESS = {"ec2SshKey": "eksKeypair"} DEFAULT_SCALING_CONFIG = {"minSize": 2, "maxSize": 2, "desiredSize": 2} @@ -91,28 +92,28 @@ NODEGROUP_NOT_FOUND_MSG = "No node group found for name: {nodegroupName}." class Cluster: def __init__( self, - name, - role_arn, - resources_vpc_config, - account_id, - region_name, - aws_partition, - version=None, - kubernetes_network_config=None, - logging=None, - client_request_token=None, - tags=None, - encryption_config=None, + name: str, + role_arn: str, + resources_vpc_config: Dict[str, Any], + account_id: str, + region_name: str, + aws_partition: str, + version: Optional[str] = None, + kubernetes_network_config: Optional[Dict[str, str]] = None, + logging: Optional[Dict[str, Any]] = None, + client_request_token: Optional[str] = None, + tags: Optional[Dict[str, str]] = None, + encryption_config: Optional[List[Dict[str, Any]]] = None, ): if encryption_config is None: encryption_config = [] if tags is None: tags = dict() - self.nodegroups = dict() + self.nodegroups: Dict[str, ManagedNodegroup] = dict() self.nodegroup_count = 0 - self.fargate_profiles = dict() + self.fargate_profiles: Dict[str, FargateProfile] = dict() self.fargate_profile_count = 0 self.arn = CLUSTER_ARN_TEMPLATE.format( @@ -141,7 +142,7 @@ class Cluster: self.role_arn = role_arn self.tags = tags - def __iter__(self): + def __iter__(self) -> Iterator[Tuple[str, Any]]: yield "name", self.name yield "arn", self.arn yield "createdAt", self.creation_date @@ -159,23 +160,23 @@ class Cluster: yield "tags", self.tags yield "encryptionConfig", self.encryption_config - def isActive(self): + def isActive(self) -> bool: return self.status == "ACTIVE" class FargateProfile: def __init__( self, - cluster_name, - fargate_profile_name, - pod_execution_role_arn, - selectors, - account_id, - region_name, - aws_partition, - client_request_token=None, - subnets=None, - tags=None, + cluster_name: str, + fargate_profile_name: str, + pod_execution_role_arn: str, + selectors: List[Dict[str, Any]], + account_id: str, + region_name: str, + aws_partition: str, + client_request_token: Optional[str] = None, + subnets: Optional[List[str]] = None, + tags: Optional[Dict[str, str]] = None, ): if subnets is None: subnets = list() @@ -202,7 +203,7 @@ class FargateProfile: self.subnets = subnets self.tags = tags - def __iter__(self): + def __iter__(self) -> Iterator[Tuple[str, Any]]: yield "clusterName", self.cluster_name yield "createdAt", self.created_at yield "fargateProfileArn", self.fargate_profile_arn @@ -217,33 +218,33 @@ class FargateProfile: class ManagedNodegroup: def __init__( self, - cluster_name, - node_role, - nodegroup_name, - subnets, - account_id, - region_name, - aws_partition, - scaling_config=None, - disk_size=None, - instance_types=None, - ami_type=None, - remote_access=None, - labels=None, - taints=None, - tags=None, - client_request_token=None, - launch_template=None, - capacity_type=None, - version=None, - release_version=None, + cluster_name: str, + node_role: str, + nodegroup_name: str, + subnets: List[str], + account_id: str, + region_name: str, + aws_partition: str, + scaling_config: Optional[Dict[str, int]] = None, + disk_size: Optional[int] = None, + instance_types: Optional[List[str]] = None, + ami_type: Optional[str] = None, + remote_access: Optional[Dict[str, Any]] = None, + labels: Optional[Dict[str, str]] = None, + taints: Optional[List[Dict[str, str]]] = None, + tags: Optional[Dict[str, str]] = None, + client_request_token: Optional[str] = None, + launch_template: Optional[Dict[str, str]] = None, + capacity_type: Optional[str] = None, + version: Optional[str] = None, + release_version: Optional[str] = None, ): if tags is None: tags = dict() if labels is None: labels = dict() if taints is None: - taints = dict() + taints = [] self.uuid = str(random.uuid4()) self.arn = NODEGROUP_ARN_TEMPLATE.format( @@ -291,19 +292,19 @@ class ManagedNodegroup: ec2 = ec2_backends[account_id][region_name] template = None - if "name" in self.launch_template: - name = self.launch_template["name"] + if "name" in self.launch_template: # type: ignore + name = self.launch_template["name"] # type: ignore template = ec2.describe_launch_templates(template_names=[name])[0] - elif "id" in self.launch_template: - _id = self.launch_template["id"] + elif "id" in self.launch_template: # type: ignore + _id = self.launch_template["id"] # type: ignore template = ec2.describe_launch_templates(template_ids=[_id])[0] - self.launch_template["id"] = template.id - self.launch_template["name"] = template.name + self.launch_template["id"] = template.id # type: ignore + self.launch_template["name"] = template.name # type: ignore except: # noqa: E722 Do not use bare except pass - def __iter__(self): + def __iter__(self) -> Iterator[Tuple[str, Any]]: yield "nodegroupName", self.nodegroup_name yield "nodegroupArn", self.arn yield "clusterName", self.cluster_name @@ -329,24 +330,24 @@ class ManagedNodegroup: class EKSBackend(BaseBackend): - def __init__(self, region_name, account_id): + def __init__(self, region_name: str, account_id: str): super().__init__(region_name, account_id) - self.clusters = dict() + self.clusters: Dict[str, Cluster] = dict() self.cluster_count = 0 self.partition = get_partition(region_name) def create_cluster( self, - name, - role_arn, - resources_vpc_config, - version=None, - kubernetes_network_config=None, - logging=None, - client_request_token=None, - tags=None, - encryption_config=None, - ): + name: str, + role_arn: str, + resources_vpc_config: Dict[str, Any], + version: Optional[str] = None, + kubernetes_network_config: Optional[Dict[str, str]] = None, + logging: Optional[Dict[str, Any]] = None, + client_request_token: Optional[str] = None, + tags: Optional[Dict[str, str]] = None, + encryption_config: Optional[List[Dict[str, Any]]] = None, + ) -> Cluster: if name in self.clusters: # Cluster exists. raise ResourceInUseException( @@ -377,14 +378,14 @@ class EKSBackend(BaseBackend): def create_fargate_profile( self, - fargate_profile_name, - cluster_name, - selectors, - pod_execution_role_arn, - subnets=None, - client_request_token=None, - tags=None, - ): + fargate_profile_name: str, + cluster_name: str, + selectors: List[Dict[str, Any]], + pod_execution_role_arn: str, + subnets: Optional[List[str]] = None, + client_request_token: Optional[str] = None, + tags: Optional[Dict[str, str]] = None, + ) -> FargateProfile: try: # Cluster exists. cluster = self.clusters[cluster_name] @@ -431,24 +432,24 @@ class EKSBackend(BaseBackend): def create_nodegroup( self, - cluster_name, - node_role, - nodegroup_name, - subnets, - scaling_config=None, - disk_size=None, - instance_types=None, - ami_type=None, - remote_access=None, - labels=None, - taints=None, - tags=None, - client_request_token=None, - launch_template=None, - capacity_type=None, - version=None, - release_version=None, - ): + cluster_name: str, + node_role: str, + nodegroup_name: str, + subnets: List[str], + scaling_config: Optional[Dict[str, int]] = None, + disk_size: Optional[int] = None, + instance_types: Optional[List[str]] = None, + ami_type: Optional[str] = None, + remote_access: Optional[Dict[str, Any]] = None, + labels: Optional[Dict[str, str]] = None, + taints: Optional[List[Dict[str, str]]] = None, + tags: Optional[Dict[str, str]] = None, + client_request_token: Optional[str] = None, + launch_template: Optional[Dict[str, str]] = None, + capacity_type: Optional[str] = None, + version: Optional[str] = None, + release_version: Optional[str] = None, + ) -> ManagedNodegroup: try: # Cluster exists. cluster = self.clusters[cluster_name] @@ -506,7 +507,7 @@ class EKSBackend(BaseBackend): cluster.nodegroup_count += 1 return nodegroup - def describe_cluster(self, name): + def describe_cluster(self, name: str) -> Cluster: try: # Cluster exists. return self.clusters[name] @@ -520,7 +521,9 @@ class EKSBackend(BaseBackend): message=CLUSTER_NOT_FOUND_MSG.format(clusterName=name), ) - def describe_fargate_profile(self, cluster_name, fargate_profile_name): + def describe_fargate_profile( + self, cluster_name: str, fargate_profile_name: str + ) -> FargateProfile: try: # Cluster exists. cluster = self.clusters[cluster_name] @@ -548,7 +551,9 @@ class EKSBackend(BaseBackend): ), ) - def describe_nodegroup(self, cluster_name, nodegroup_name): + def describe_nodegroup( + self, cluster_name: str, nodegroup_name: str + ) -> ManagedNodegroup: try: # Cluster exists. cluster = self.clusters[cluster_name] @@ -574,7 +579,7 @@ class EKSBackend(BaseBackend): message=NODEGROUP_NOT_FOUND_MSG.format(nodegroupName=nodegroup_name), ) - def delete_cluster(self, name): + def delete_cluster(self, name: str) -> Cluster: try: # Cluster exists. validate_safe_to_delete(self.clusters[name]) @@ -592,7 +597,9 @@ class EKSBackend(BaseBackend): self.cluster_count -= 1 return result - def delete_fargate_profile(self, cluster_name, fargate_profile_name): + def delete_fargate_profile( + self, cluster_name: str, fargate_profile_name: str + ) -> FargateProfile: try: # Cluster exists. cluster = self.clusters[cluster_name] @@ -623,7 +630,9 @@ class EKSBackend(BaseBackend): cluster.fargate_profile_count -= 1 return deleted_fargate_profile - def delete_nodegroup(self, cluster_name, nodegroup_name): + def delete_nodegroup( + self, cluster_name: str, nodegroup_name: str + ) -> ManagedNodegroup: try: # Cluster exists. cluster = self.clusters[cluster_name] @@ -652,7 +661,7 @@ class EKSBackend(BaseBackend): cluster.nodegroup_count -= 1 return result - def tag_resource(self, resource_arn, tags): + def tag_resource(self, resource_arn: str, tags: Dict[str, str]) -> None: """ This function currently will tag an EKS cluster only. It does not tag a managed node group """ @@ -673,9 +682,8 @@ class EKSBackend(BaseBackend): message="An error occurred (NotFoundException) when calling the TagResource operation: Resource was not found", ) cluster.tags.update(tags) - return "" - def untag_resource(self, resource_arn, tag_keys): + def untag_resource(self, resource_arn: str, tag_keys: List[str]) -> None: """ This function currently will remove tags on an EKS cluster only. It does not remove tags from a managed node group """ @@ -700,9 +708,8 @@ class EKSBackend(BaseBackend): for name in tag_keys: if name in cluster.tags: del cluster.tags[name] - return "" - def list_tags_for_resource(self, resource_arn): + def list_tags_for_resource(self, resource_arn: str) -> Dict[str, str]: """ This function currently will list tags on an EKS cluster only. It does not list tags from a managed node group """ @@ -724,19 +731,29 @@ class EKSBackend(BaseBackend): ) return cluster.tags - def list_clusters(self, max_results, next_token): - return paginated_list(self.clusters.keys(), max_results, next_token) + def list_clusters( + self, max_results: int, next_token: Optional[str] + ) -> Tuple[List[Cluster], Optional[Cluster]]: + return paginated_list(list(self.clusters.keys()), max_results, next_token) - def list_fargate_profiles(self, cluster_name, max_results, next_token): + def list_fargate_profiles( + self, cluster_name: str, max_results: int, next_token: Optional[str] + ) -> Tuple[List[FargateProfile], Optional[FargateProfile]]: cluster = self.clusters[cluster_name] - return paginated_list(cluster.fargate_profiles.keys(), max_results, next_token) + return paginated_list( + list(cluster.fargate_profiles.keys()), max_results, next_token + ) - def list_nodegroups(self, cluster_name, max_results, next_token): + def list_nodegroups( + self, cluster_name: str, max_results: int, next_token: Optional[str] + ) -> Tuple[List[ManagedNodegroup], Optional[ManagedNodegroup]]: cluster = self.clusters[cluster_name] - return paginated_list(cluster.nodegroups.keys(), max_results, next_token) + return paginated_list(list(cluster.nodegroups.keys()), max_results, next_token) -def paginated_list(full_list, max_results, next_token): +def paginated_list( + full_list: List[Any], max_results: int, next_token: Optional[str] +) -> Tuple[List[Any], Optional[Any]]: """ Returns a tuple containing a slice of the full list starting at next_token and ending with at most the @@ -754,7 +771,7 @@ def paginated_list(full_list, max_results, next_token): return sorted_list[start:end], new_next -def validate_safe_to_delete(cluster): +def validate_safe_to_delete(cluster: Cluster) -> None: # A cluster which has nodegroups attached can not be deleted. if cluster.nodegroup_count: nodegroup_names = ",".join(list(cluster.nodegroups.keys())) @@ -766,7 +783,9 @@ def validate_safe_to_delete(cluster): ) -def validate_launch_template_combination(disk_size, remote_access): +def validate_launch_template_combination( + disk_size: Optional[int], remote_access: Optional[Dict[str, Any]] +) -> None: if not (disk_size or remote_access): return @@ -777,8 +796,8 @@ def validate_launch_template_combination(disk_size, remote_access): ) -def _validate_fargate_profile_selectors(selectors): - def raise_exception(message): +def _validate_fargate_profile_selectors(selectors: List[Dict[str, Any]]) -> None: + def raise_exception(message: str) -> None: raise InvalidParameterException( clusterName=None, nodegroupName=None, diff --git a/moto/eks/responses.py b/moto/eks/responses.py index c0deb4845..b87054eab 100644 --- a/moto/eks/responses.py +++ b/moto/eks/responses.py @@ -1,22 +1,24 @@ import json +from typing import Any from urllib.parse import unquote +from moto.core.common_types import TYPE_RESPONSE from moto.core.responses import BaseResponse -from .models import eks_backends +from .models import eks_backends, EKSBackend DEFAULT_MAX_RESULTS = 100 DEFAULT_NEXT_TOKEN = "" class EKSResponse(BaseResponse): - def __init__(self): + def __init__(self) -> None: super().__init__(service_name="eks") @property - def eks_backend(self): + def eks_backend(self) -> EKSBackend: return eks_backends[self.current_account][self.region] - def create_cluster(self): + def create_cluster(self) -> TYPE_RESPONSE: name = self._get_param("name") version = self._get_param("version") role_arn = self._get_param("roleArn") @@ -41,7 +43,7 @@ class EKSResponse(BaseResponse): return 200, {}, json.dumps({"cluster": dict(cluster)}) - def create_fargate_profile(self): + def create_fargate_profile(self) -> TYPE_RESPONSE: fargate_profile_name = self._get_param("fargateProfileName") cluster_name = self._get_param("name") pod_execution_role_arn = self._get_param("podExecutionRoleArn") @@ -62,7 +64,7 @@ class EKSResponse(BaseResponse): return 200, {}, json.dumps({"fargateProfile": dict(fargate_profile)}) - def create_nodegroup(self): + def create_nodegroup(self) -> TYPE_RESPONSE: cluster_name = self._get_param("name") nodegroup_name = self._get_param("nodegroupName") scaling_config = self._get_param("scalingConfig") @@ -101,14 +103,14 @@ class EKSResponse(BaseResponse): return 200, {}, json.dumps({"nodegroup": dict(nodegroup)}) - def describe_cluster(self): + def describe_cluster(self) -> TYPE_RESPONSE: name = self._get_param("name") cluster = self.eks_backend.describe_cluster(name=name) return 200, {}, json.dumps({"cluster": dict(cluster)}) - def describe_fargate_profile(self): + def describe_fargate_profile(self) -> TYPE_RESPONSE: cluster_name = self._get_param("name") fargate_profile_name = self._get_param("fargateProfileName") @@ -117,7 +119,7 @@ class EKSResponse(BaseResponse): ) return 200, {}, json.dumps({"fargateProfile": dict(fargate_profile)}) - def describe_nodegroup(self): + def describe_nodegroup(self) -> TYPE_RESPONSE: cluster_name = self._get_param("name") nodegroup_name = self._get_param("nodegroupName") @@ -127,7 +129,7 @@ class EKSResponse(BaseResponse): return 200, {}, json.dumps({"nodegroup": dict(nodegroup)}) - def list_clusters(self): + def list_clusters(self) -> TYPE_RESPONSE: max_results = self._get_int_param("maxResults", DEFAULT_MAX_RESULTS) next_token = self._get_param("nextToken", DEFAULT_NEXT_TOKEN) @@ -137,7 +139,7 @@ class EKSResponse(BaseResponse): return 200, {}, json.dumps(dict(clusters=clusters, nextToken=next_token)) - def list_fargate_profiles(self): + def list_fargate_profiles(self) -> TYPE_RESPONSE: cluster_name = self._get_param("name") max_results = self._get_int_param("maxResults", DEFAULT_MAX_RESULTS) next_token = self._get_param("nextToken", DEFAULT_NEXT_TOKEN) @@ -154,7 +156,7 @@ class EKSResponse(BaseResponse): ), ) - def list_nodegroups(self): + def list_nodegroups(self) -> TYPE_RESPONSE: cluster_name = self._get_param("name") max_results = self._get_int_param("maxResults", DEFAULT_MAX_RESULTS) next_token = self._get_param("nextToken", DEFAULT_NEXT_TOKEN) @@ -165,14 +167,14 @@ class EKSResponse(BaseResponse): return 200, {}, json.dumps(dict(nodegroups=nodegroups, nextToken=next_token)) - def delete_cluster(self): + def delete_cluster(self) -> TYPE_RESPONSE: name = self._get_param("name") cluster = self.eks_backend.delete_cluster(name=name) return 200, {}, json.dumps({"cluster": dict(cluster)}) - def delete_fargate_profile(self): + def delete_fargate_profile(self) -> TYPE_RESPONSE: cluster_name = self._get_param("name") fargate_profile_name = self._get_param("fargateProfileName") @@ -182,7 +184,7 @@ class EKSResponse(BaseResponse): return 200, {}, json.dumps({"fargateProfile": dict(fargate_profile)}) - def delete_nodegroup(self): + def delete_nodegroup(self) -> TYPE_RESPONSE: cluster_name = self._get_param("name") nodegroup_name = self._get_param("nodegroupName") @@ -192,7 +194,7 @@ class EKSResponse(BaseResponse): return 200, {}, json.dumps({"nodegroup": dict(nodegroup)}) - def tags(self, request, full_url, headers): + def tags(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore[return] self.setup_class(request, full_url, headers) if request.method == "GET": return self.list_tags_for_resource() @@ -201,25 +203,25 @@ class EKSResponse(BaseResponse): if request.method == "DELETE": return self.untag_resource() - def tag_resource(self): + def tag_resource(self) -> TYPE_RESPONSE: self.eks_backend.tag_resource( self._extract_arn_from_path(), self._get_param("tags") ) return 200, {}, "" - def untag_resource(self): + def untag_resource(self) -> TYPE_RESPONSE: self.eks_backend.untag_resource( self._extract_arn_from_path(), self._get_param("tagKeys") ) return 200, {}, "" - def list_tags_for_resource(self): + def list_tags_for_resource(self) -> TYPE_RESPONSE: tags = self.eks_backend.list_tags_for_resource(self._extract_arn_from_path()) return 200, {}, json.dumps({"tags": tags}) - def _extract_arn_from_path(self): + def _extract_arn_from_path(self) -> str: # /tags/arn_that_may_contain_a_slash path = unquote(self.path) return "/".join(path.split("/")[2:]) diff --git a/moto/eks/utils.py b/moto/eks/utils.py index 686cd2aad..d7722dbc7 100644 --- a/moto/eks/utils.py +++ b/moto/eks/utils.py @@ -6,7 +6,7 @@ from boto3 import Session from moto.eks.exceptions import InvalidParameterException -def get_partition(region): +def get_partition(region: str) -> str: valid_matches = [ # (region prefix, aws partition) ("cn-", "aws-cn"), @@ -21,7 +21,7 @@ def get_partition(region): return "aws" -def method_name(use_parent=False): +def method_name(use_parent: bool = False) -> str: """ Returns the name of the method which called it from the stack in PascalCase. If `use_parent` is True, returns the parent of the method which called it instead. @@ -37,12 +37,12 @@ def method_name(use_parent=False): ) -def validate_role_arn(arn): +def validate_role_arn(arn: str) -> None: valid_role_arn_format = re.compile( "arn:(?P.+):iam::(?P[0-9]{12}):role/.+" ) match = valid_role_arn_format.match(arn) - valid_partition = match.group("partition") in Session().get_available_partitions() + valid_partition = match.group("partition") in Session().get_available_partitions() # type: ignore if not all({arn, match, valid_partition}): - raise InvalidParameterException("Invalid Role Arn: '" + arn + "'") + raise InvalidParameterException("Invalid Role Arn: '" + arn + "'") # type: ignore diff --git a/setup.cfg b/setup.cfg index de3c1ad34..228066350 100644 --- a/setup.cfg +++ b/setup.cfg @@ -229,7 +229,7 @@ disable = W,C,R,E enable = anomalous-backslash-in-string, arguments-renamed, dangerous-default-value, deprecated-module, function-redefined, import-self, redefined-builtin, redefined-outer-name, reimported, pointless-statement, super-with-arguments, unused-argument, unused-import, unused-variable, useless-else-on-loop, wildcard-import [mypy] -files= moto/a*,moto/b*,moto/c*,moto/d*,moto/ebs/,moto/ec2,moto/ec2instanceconnect,moto/ecr,moto/ecs,moto/efs,moto/es,moto/moto_api +files= moto/a*,moto/b*,moto/c*,moto/d*,moto/ebs/,moto/ec2,moto/ec2instanceconnect,moto/ecr,moto/ecs,moto/efs,moto/eks,moto/es,moto/moto_api show_column_numbers=True show_error_codes = True disable_error_code=abstract