Techdebt: MyPy EKS (#5961)

This commit is contained in:
Bert Blommers 2023-02-22 10:19:58 -01:00 committed by GitHub
parent 03dfbe6931
commit eb3ed9106b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 167 additions and 145 deletions

View File

@ -1,16 +1,17 @@
import json import json
from typing import Any, Dict, Tuple
from moto.core.exceptions import AWSError from moto.core.exceptions import AWSError
class EKSError(AWSError): class EKSError(AWSError):
def __init__(self, **kwargs): def __init__(self, **kwargs: Any):
super(AWSError, self).__init__(error_type=self.TYPE, message="") super(AWSError, self).__init__(error_type=self.TYPE, message="") # type: ignore
self.description = json.dumps(kwargs) self.description = json.dumps(kwargs)
self.headers = {"status": self.STATUS, "x-amzn-ErrorType": self.TYPE} self.headers = {"status": self.STATUS, "x-amzn-ErrorType": self.TYPE}
self.code = self.STATUS self.code = self.STATUS
def response(self): def response(self) -> Tuple[int, Dict[str, Any], str]: # type: ignore[override]
return self.STATUS, self.headers, self.description return self.STATUS, self.headers, self.description

View File

@ -1,4 +1,5 @@
from datetime import datetime from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple, Iterator
from moto.core import BaseBackend, BackendDict from moto.core import BaseBackend, BackendDict
from moto.core.utils import iso_8601_datetime_without_milliseconds from moto.core.utils import iso_8601_datetime_without_milliseconds
@ -52,7 +53,7 @@ DEFAULT_AMI_TYPE = "AL2_x86_64"
DEFAULT_CAPACITY_TYPE = "ON_DEMAND" DEFAULT_CAPACITY_TYPE = "ON_DEMAND"
DEFAULT_DISK_SIZE = "20" DEFAULT_DISK_SIZE = "20"
DEFAULT_INSTANCE_TYPES = ["t3.medium"] DEFAULT_INSTANCE_TYPES = ["t3.medium"]
DEFAULT_NODEGROUP_HEALTH = {"issues": []} DEFAULT_NODEGROUP_HEALTH: Dict[str, Any] = {"issues": []}
DEFAULT_RELEASE_VERSION = "1.19.8-20210414" DEFAULT_RELEASE_VERSION = "1.19.8-20210414"
DEFAULT_REMOTE_ACCESS = {"ec2SshKey": "eksKeypair"} DEFAULT_REMOTE_ACCESS = {"ec2SshKey": "eksKeypair"}
DEFAULT_SCALING_CONFIG = {"minSize": 2, "maxSize": 2, "desiredSize": 2} DEFAULT_SCALING_CONFIG = {"minSize": 2, "maxSize": 2, "desiredSize": 2}
@ -91,28 +92,28 @@ NODEGROUP_NOT_FOUND_MSG = "No node group found for name: {nodegroupName}."
class Cluster: class Cluster:
def __init__( def __init__(
self, self,
name, name: str,
role_arn, role_arn: str,
resources_vpc_config, resources_vpc_config: Dict[str, Any],
account_id, account_id: str,
region_name, region_name: str,
aws_partition, aws_partition: str,
version=None, version: Optional[str] = None,
kubernetes_network_config=None, kubernetes_network_config: Optional[Dict[str, str]] = None,
logging=None, logging: Optional[Dict[str, Any]] = None,
client_request_token=None, client_request_token: Optional[str] = None,
tags=None, tags: Optional[Dict[str, str]] = None,
encryption_config=None, encryption_config: Optional[List[Dict[str, Any]]] = None,
): ):
if encryption_config is None: if encryption_config is None:
encryption_config = [] encryption_config = []
if tags is None: if tags is None:
tags = dict() tags = dict()
self.nodegroups = dict() self.nodegroups: Dict[str, ManagedNodegroup] = dict()
self.nodegroup_count = 0 self.nodegroup_count = 0
self.fargate_profiles = dict() self.fargate_profiles: Dict[str, FargateProfile] = dict()
self.fargate_profile_count = 0 self.fargate_profile_count = 0
self.arn = CLUSTER_ARN_TEMPLATE.format( self.arn = CLUSTER_ARN_TEMPLATE.format(
@ -141,7 +142,7 @@ class Cluster:
self.role_arn = role_arn self.role_arn = role_arn
self.tags = tags self.tags = tags
def __iter__(self): def __iter__(self) -> Iterator[Tuple[str, Any]]:
yield "name", self.name yield "name", self.name
yield "arn", self.arn yield "arn", self.arn
yield "createdAt", self.creation_date yield "createdAt", self.creation_date
@ -159,23 +160,23 @@ class Cluster:
yield "tags", self.tags yield "tags", self.tags
yield "encryptionConfig", self.encryption_config yield "encryptionConfig", self.encryption_config
def isActive(self): def isActive(self) -> bool:
return self.status == "ACTIVE" return self.status == "ACTIVE"
class FargateProfile: class FargateProfile:
def __init__( def __init__(
self, self,
cluster_name, cluster_name: str,
fargate_profile_name, fargate_profile_name: str,
pod_execution_role_arn, pod_execution_role_arn: str,
selectors, selectors: List[Dict[str, Any]],
account_id, account_id: str,
region_name, region_name: str,
aws_partition, aws_partition: str,
client_request_token=None, client_request_token: Optional[str] = None,
subnets=None, subnets: Optional[List[str]] = None,
tags=None, tags: Optional[Dict[str, str]] = None,
): ):
if subnets is None: if subnets is None:
subnets = list() subnets = list()
@ -202,7 +203,7 @@ class FargateProfile:
self.subnets = subnets self.subnets = subnets
self.tags = tags self.tags = tags
def __iter__(self): def __iter__(self) -> Iterator[Tuple[str, Any]]:
yield "clusterName", self.cluster_name yield "clusterName", self.cluster_name
yield "createdAt", self.created_at yield "createdAt", self.created_at
yield "fargateProfileArn", self.fargate_profile_arn yield "fargateProfileArn", self.fargate_profile_arn
@ -217,33 +218,33 @@ class FargateProfile:
class ManagedNodegroup: class ManagedNodegroup:
def __init__( def __init__(
self, self,
cluster_name, cluster_name: str,
node_role, node_role: str,
nodegroup_name, nodegroup_name: str,
subnets, subnets: List[str],
account_id, account_id: str,
region_name, region_name: str,
aws_partition, aws_partition: str,
scaling_config=None, scaling_config: Optional[Dict[str, int]] = None,
disk_size=None, disk_size: Optional[int] = None,
instance_types=None, instance_types: Optional[List[str]] = None,
ami_type=None, ami_type: Optional[str] = None,
remote_access=None, remote_access: Optional[Dict[str, Any]] = None,
labels=None, labels: Optional[Dict[str, str]] = None,
taints=None, taints: Optional[List[Dict[str, str]]] = None,
tags=None, tags: Optional[Dict[str, str]] = None,
client_request_token=None, client_request_token: Optional[str] = None,
launch_template=None, launch_template: Optional[Dict[str, str]] = None,
capacity_type=None, capacity_type: Optional[str] = None,
version=None, version: Optional[str] = None,
release_version=None, release_version: Optional[str] = None,
): ):
if tags is None: if tags is None:
tags = dict() tags = dict()
if labels is None: if labels is None:
labels = dict() labels = dict()
if taints is None: if taints is None:
taints = dict() taints = []
self.uuid = str(random.uuid4()) self.uuid = str(random.uuid4())
self.arn = NODEGROUP_ARN_TEMPLATE.format( self.arn = NODEGROUP_ARN_TEMPLATE.format(
@ -291,19 +292,19 @@ class ManagedNodegroup:
ec2 = ec2_backends[account_id][region_name] ec2 = ec2_backends[account_id][region_name]
template = None template = None
if "name" in self.launch_template: if "name" in self.launch_template: # type: ignore
name = self.launch_template["name"] name = self.launch_template["name"] # type: ignore
template = ec2.describe_launch_templates(template_names=[name])[0] template = ec2.describe_launch_templates(template_names=[name])[0]
elif "id" in self.launch_template: elif "id" in self.launch_template: # type: ignore
_id = self.launch_template["id"] _id = self.launch_template["id"] # type: ignore
template = ec2.describe_launch_templates(template_ids=[_id])[0] template = ec2.describe_launch_templates(template_ids=[_id])[0]
self.launch_template["id"] = template.id self.launch_template["id"] = template.id # type: ignore
self.launch_template["name"] = template.name self.launch_template["name"] = template.name # type: ignore
except: # noqa: E722 Do not use bare except except: # noqa: E722 Do not use bare except
pass pass
def __iter__(self): def __iter__(self) -> Iterator[Tuple[str, Any]]:
yield "nodegroupName", self.nodegroup_name yield "nodegroupName", self.nodegroup_name
yield "nodegroupArn", self.arn yield "nodegroupArn", self.arn
yield "clusterName", self.cluster_name yield "clusterName", self.cluster_name
@ -329,24 +330,24 @@ class ManagedNodegroup:
class EKSBackend(BaseBackend): class EKSBackend(BaseBackend):
def __init__(self, region_name, account_id): def __init__(self, region_name: str, account_id: str):
super().__init__(region_name, account_id) super().__init__(region_name, account_id)
self.clusters = dict() self.clusters: Dict[str, Cluster] = dict()
self.cluster_count = 0 self.cluster_count = 0
self.partition = get_partition(region_name) self.partition = get_partition(region_name)
def create_cluster( def create_cluster(
self, self,
name, name: str,
role_arn, role_arn: str,
resources_vpc_config, resources_vpc_config: Dict[str, Any],
version=None, version: Optional[str] = None,
kubernetes_network_config=None, kubernetes_network_config: Optional[Dict[str, str]] = None,
logging=None, logging: Optional[Dict[str, Any]] = None,
client_request_token=None, client_request_token: Optional[str] = None,
tags=None, tags: Optional[Dict[str, str]] = None,
encryption_config=None, encryption_config: Optional[List[Dict[str, Any]]] = None,
): ) -> Cluster:
if name in self.clusters: if name in self.clusters:
# Cluster exists. # Cluster exists.
raise ResourceInUseException( raise ResourceInUseException(
@ -377,14 +378,14 @@ class EKSBackend(BaseBackend):
def create_fargate_profile( def create_fargate_profile(
self, self,
fargate_profile_name, fargate_profile_name: str,
cluster_name, cluster_name: str,
selectors, selectors: List[Dict[str, Any]],
pod_execution_role_arn, pod_execution_role_arn: str,
subnets=None, subnets: Optional[List[str]] = None,
client_request_token=None, client_request_token: Optional[str] = None,
tags=None, tags: Optional[Dict[str, str]] = None,
): ) -> FargateProfile:
try: try:
# Cluster exists. # Cluster exists.
cluster = self.clusters[cluster_name] cluster = self.clusters[cluster_name]
@ -431,24 +432,24 @@ class EKSBackend(BaseBackend):
def create_nodegroup( def create_nodegroup(
self, self,
cluster_name, cluster_name: str,
node_role, node_role: str,
nodegroup_name, nodegroup_name: str,
subnets, subnets: List[str],
scaling_config=None, scaling_config: Optional[Dict[str, int]] = None,
disk_size=None, disk_size: Optional[int] = None,
instance_types=None, instance_types: Optional[List[str]] = None,
ami_type=None, ami_type: Optional[str] = None,
remote_access=None, remote_access: Optional[Dict[str, Any]] = None,
labels=None, labels: Optional[Dict[str, str]] = None,
taints=None, taints: Optional[List[Dict[str, str]]] = None,
tags=None, tags: Optional[Dict[str, str]] = None,
client_request_token=None, client_request_token: Optional[str] = None,
launch_template=None, launch_template: Optional[Dict[str, str]] = None,
capacity_type=None, capacity_type: Optional[str] = None,
version=None, version: Optional[str] = None,
release_version=None, release_version: Optional[str] = None,
): ) -> ManagedNodegroup:
try: try:
# Cluster exists. # Cluster exists.
cluster = self.clusters[cluster_name] cluster = self.clusters[cluster_name]
@ -506,7 +507,7 @@ class EKSBackend(BaseBackend):
cluster.nodegroup_count += 1 cluster.nodegroup_count += 1
return nodegroup return nodegroup
def describe_cluster(self, name): def describe_cluster(self, name: str) -> Cluster:
try: try:
# Cluster exists. # Cluster exists.
return self.clusters[name] return self.clusters[name]
@ -520,7 +521,9 @@ class EKSBackend(BaseBackend):
message=CLUSTER_NOT_FOUND_MSG.format(clusterName=name), message=CLUSTER_NOT_FOUND_MSG.format(clusterName=name),
) )
def describe_fargate_profile(self, cluster_name, fargate_profile_name): def describe_fargate_profile(
self, cluster_name: str, fargate_profile_name: str
) -> FargateProfile:
try: try:
# Cluster exists. # Cluster exists.
cluster = self.clusters[cluster_name] cluster = self.clusters[cluster_name]
@ -548,7 +551,9 @@ class EKSBackend(BaseBackend):
), ),
) )
def describe_nodegroup(self, cluster_name, nodegroup_name): def describe_nodegroup(
self, cluster_name: str, nodegroup_name: str
) -> ManagedNodegroup:
try: try:
# Cluster exists. # Cluster exists.
cluster = self.clusters[cluster_name] cluster = self.clusters[cluster_name]
@ -574,7 +579,7 @@ class EKSBackend(BaseBackend):
message=NODEGROUP_NOT_FOUND_MSG.format(nodegroupName=nodegroup_name), message=NODEGROUP_NOT_FOUND_MSG.format(nodegroupName=nodegroup_name),
) )
def delete_cluster(self, name): def delete_cluster(self, name: str) -> Cluster:
try: try:
# Cluster exists. # Cluster exists.
validate_safe_to_delete(self.clusters[name]) validate_safe_to_delete(self.clusters[name])
@ -592,7 +597,9 @@ class EKSBackend(BaseBackend):
self.cluster_count -= 1 self.cluster_count -= 1
return result return result
def delete_fargate_profile(self, cluster_name, fargate_profile_name): def delete_fargate_profile(
self, cluster_name: str, fargate_profile_name: str
) -> FargateProfile:
try: try:
# Cluster exists. # Cluster exists.
cluster = self.clusters[cluster_name] cluster = self.clusters[cluster_name]
@ -623,7 +630,9 @@ class EKSBackend(BaseBackend):
cluster.fargate_profile_count -= 1 cluster.fargate_profile_count -= 1
return deleted_fargate_profile return deleted_fargate_profile
def delete_nodegroup(self, cluster_name, nodegroup_name): def delete_nodegroup(
self, cluster_name: str, nodegroup_name: str
) -> ManagedNodegroup:
try: try:
# Cluster exists. # Cluster exists.
cluster = self.clusters[cluster_name] cluster = self.clusters[cluster_name]
@ -652,7 +661,7 @@ class EKSBackend(BaseBackend):
cluster.nodegroup_count -= 1 cluster.nodegroup_count -= 1
return result return result
def tag_resource(self, resource_arn, tags): def tag_resource(self, resource_arn: str, tags: Dict[str, str]) -> None:
""" """
This function currently will tag an EKS cluster only. It does not tag a managed node group This function currently will tag an EKS cluster only. It does not tag a managed node group
""" """
@ -673,9 +682,8 @@ class EKSBackend(BaseBackend):
message="An error occurred (NotFoundException) when calling the TagResource operation: Resource was not found", message="An error occurred (NotFoundException) when calling the TagResource operation: Resource was not found",
) )
cluster.tags.update(tags) cluster.tags.update(tags)
return ""
def untag_resource(self, resource_arn, tag_keys): def untag_resource(self, resource_arn: str, tag_keys: List[str]) -> None:
""" """
This function currently will remove tags on an EKS cluster only. It does not remove tags from a managed node group This function currently will remove tags on an EKS cluster only. It does not remove tags from a managed node group
""" """
@ -700,9 +708,8 @@ class EKSBackend(BaseBackend):
for name in tag_keys: for name in tag_keys:
if name in cluster.tags: if name in cluster.tags:
del cluster.tags[name] del cluster.tags[name]
return ""
def list_tags_for_resource(self, resource_arn): def list_tags_for_resource(self, resource_arn: str) -> Dict[str, str]:
""" """
This function currently will list tags on an EKS cluster only. It does not list tags from a managed node group This function currently will list tags on an EKS cluster only. It does not list tags from a managed node group
""" """
@ -724,19 +731,29 @@ class EKSBackend(BaseBackend):
) )
return cluster.tags return cluster.tags
def list_clusters(self, max_results, next_token): def list_clusters(
return paginated_list(self.clusters.keys(), max_results, next_token) self, max_results: int, next_token: Optional[str]
) -> Tuple[List[Cluster], Optional[Cluster]]:
return paginated_list(list(self.clusters.keys()), max_results, next_token)
def list_fargate_profiles(self, cluster_name, max_results, next_token): def list_fargate_profiles(
self, cluster_name: str, max_results: int, next_token: Optional[str]
) -> Tuple[List[FargateProfile], Optional[FargateProfile]]:
cluster = self.clusters[cluster_name] cluster = self.clusters[cluster_name]
return paginated_list(cluster.fargate_profiles.keys(), max_results, next_token) return paginated_list(
list(cluster.fargate_profiles.keys()), max_results, next_token
)
def list_nodegroups(self, cluster_name, max_results, next_token): def list_nodegroups(
self, cluster_name: str, max_results: int, next_token: Optional[str]
) -> Tuple[List[ManagedNodegroup], Optional[ManagedNodegroup]]:
cluster = self.clusters[cluster_name] cluster = self.clusters[cluster_name]
return paginated_list(cluster.nodegroups.keys(), max_results, next_token) return paginated_list(list(cluster.nodegroups.keys()), max_results, next_token)
def paginated_list(full_list, max_results, next_token): def paginated_list(
full_list: List[Any], max_results: int, next_token: Optional[str]
) -> Tuple[List[Any], Optional[Any]]:
""" """
Returns a tuple containing a slice of the full list Returns a tuple containing a slice of the full list
starting at next_token and ending with at most the starting at next_token and ending with at most the
@ -754,7 +771,7 @@ def paginated_list(full_list, max_results, next_token):
return sorted_list[start:end], new_next return sorted_list[start:end], new_next
def validate_safe_to_delete(cluster): def validate_safe_to_delete(cluster: Cluster) -> None:
# A cluster which has nodegroups attached can not be deleted. # A cluster which has nodegroups attached can not be deleted.
if cluster.nodegroup_count: if cluster.nodegroup_count:
nodegroup_names = ",".join(list(cluster.nodegroups.keys())) nodegroup_names = ",".join(list(cluster.nodegroups.keys()))
@ -766,7 +783,9 @@ def validate_safe_to_delete(cluster):
) )
def validate_launch_template_combination(disk_size, remote_access): def validate_launch_template_combination(
disk_size: Optional[int], remote_access: Optional[Dict[str, Any]]
) -> None:
if not (disk_size or remote_access): if not (disk_size or remote_access):
return return
@ -777,8 +796,8 @@ def validate_launch_template_combination(disk_size, remote_access):
) )
def _validate_fargate_profile_selectors(selectors): def _validate_fargate_profile_selectors(selectors: List[Dict[str, Any]]) -> None:
def raise_exception(message): def raise_exception(message: str) -> None:
raise InvalidParameterException( raise InvalidParameterException(
clusterName=None, clusterName=None,
nodegroupName=None, nodegroupName=None,

View File

@ -1,22 +1,24 @@
import json import json
from typing import Any
from urllib.parse import unquote from urllib.parse import unquote
from moto.core.common_types import TYPE_RESPONSE
from moto.core.responses import BaseResponse from moto.core.responses import BaseResponse
from .models import eks_backends from .models import eks_backends, EKSBackend
DEFAULT_MAX_RESULTS = 100 DEFAULT_MAX_RESULTS = 100
DEFAULT_NEXT_TOKEN = "" DEFAULT_NEXT_TOKEN = ""
class EKSResponse(BaseResponse): class EKSResponse(BaseResponse):
def __init__(self): def __init__(self) -> None:
super().__init__(service_name="eks") super().__init__(service_name="eks")
@property @property
def eks_backend(self): def eks_backend(self) -> EKSBackend:
return eks_backends[self.current_account][self.region] return eks_backends[self.current_account][self.region]
def create_cluster(self): def create_cluster(self) -> TYPE_RESPONSE:
name = self._get_param("name") name = self._get_param("name")
version = self._get_param("version") version = self._get_param("version")
role_arn = self._get_param("roleArn") role_arn = self._get_param("roleArn")
@ -41,7 +43,7 @@ class EKSResponse(BaseResponse):
return 200, {}, json.dumps({"cluster": dict(cluster)}) return 200, {}, json.dumps({"cluster": dict(cluster)})
def create_fargate_profile(self): def create_fargate_profile(self) -> TYPE_RESPONSE:
fargate_profile_name = self._get_param("fargateProfileName") fargate_profile_name = self._get_param("fargateProfileName")
cluster_name = self._get_param("name") cluster_name = self._get_param("name")
pod_execution_role_arn = self._get_param("podExecutionRoleArn") pod_execution_role_arn = self._get_param("podExecutionRoleArn")
@ -62,7 +64,7 @@ class EKSResponse(BaseResponse):
return 200, {}, json.dumps({"fargateProfile": dict(fargate_profile)}) return 200, {}, json.dumps({"fargateProfile": dict(fargate_profile)})
def create_nodegroup(self): def create_nodegroup(self) -> TYPE_RESPONSE:
cluster_name = self._get_param("name") cluster_name = self._get_param("name")
nodegroup_name = self._get_param("nodegroupName") nodegroup_name = self._get_param("nodegroupName")
scaling_config = self._get_param("scalingConfig") scaling_config = self._get_param("scalingConfig")
@ -101,14 +103,14 @@ class EKSResponse(BaseResponse):
return 200, {}, json.dumps({"nodegroup": dict(nodegroup)}) return 200, {}, json.dumps({"nodegroup": dict(nodegroup)})
def describe_cluster(self): def describe_cluster(self) -> TYPE_RESPONSE:
name = self._get_param("name") name = self._get_param("name")
cluster = self.eks_backend.describe_cluster(name=name) cluster = self.eks_backend.describe_cluster(name=name)
return 200, {}, json.dumps({"cluster": dict(cluster)}) return 200, {}, json.dumps({"cluster": dict(cluster)})
def describe_fargate_profile(self): def describe_fargate_profile(self) -> TYPE_RESPONSE:
cluster_name = self._get_param("name") cluster_name = self._get_param("name")
fargate_profile_name = self._get_param("fargateProfileName") fargate_profile_name = self._get_param("fargateProfileName")
@ -117,7 +119,7 @@ class EKSResponse(BaseResponse):
) )
return 200, {}, json.dumps({"fargateProfile": dict(fargate_profile)}) return 200, {}, json.dumps({"fargateProfile": dict(fargate_profile)})
def describe_nodegroup(self): def describe_nodegroup(self) -> TYPE_RESPONSE:
cluster_name = self._get_param("name") cluster_name = self._get_param("name")
nodegroup_name = self._get_param("nodegroupName") nodegroup_name = self._get_param("nodegroupName")
@ -127,7 +129,7 @@ class EKSResponse(BaseResponse):
return 200, {}, json.dumps({"nodegroup": dict(nodegroup)}) return 200, {}, json.dumps({"nodegroup": dict(nodegroup)})
def list_clusters(self): def list_clusters(self) -> TYPE_RESPONSE:
max_results = self._get_int_param("maxResults", DEFAULT_MAX_RESULTS) max_results = self._get_int_param("maxResults", DEFAULT_MAX_RESULTS)
next_token = self._get_param("nextToken", DEFAULT_NEXT_TOKEN) next_token = self._get_param("nextToken", DEFAULT_NEXT_TOKEN)
@ -137,7 +139,7 @@ class EKSResponse(BaseResponse):
return 200, {}, json.dumps(dict(clusters=clusters, nextToken=next_token)) return 200, {}, json.dumps(dict(clusters=clusters, nextToken=next_token))
def list_fargate_profiles(self): def list_fargate_profiles(self) -> TYPE_RESPONSE:
cluster_name = self._get_param("name") cluster_name = self._get_param("name")
max_results = self._get_int_param("maxResults", DEFAULT_MAX_RESULTS) max_results = self._get_int_param("maxResults", DEFAULT_MAX_RESULTS)
next_token = self._get_param("nextToken", DEFAULT_NEXT_TOKEN) next_token = self._get_param("nextToken", DEFAULT_NEXT_TOKEN)
@ -154,7 +156,7 @@ class EKSResponse(BaseResponse):
), ),
) )
def list_nodegroups(self): def list_nodegroups(self) -> TYPE_RESPONSE:
cluster_name = self._get_param("name") cluster_name = self._get_param("name")
max_results = self._get_int_param("maxResults", DEFAULT_MAX_RESULTS) max_results = self._get_int_param("maxResults", DEFAULT_MAX_RESULTS)
next_token = self._get_param("nextToken", DEFAULT_NEXT_TOKEN) next_token = self._get_param("nextToken", DEFAULT_NEXT_TOKEN)
@ -165,14 +167,14 @@ class EKSResponse(BaseResponse):
return 200, {}, json.dumps(dict(nodegroups=nodegroups, nextToken=next_token)) return 200, {}, json.dumps(dict(nodegroups=nodegroups, nextToken=next_token))
def delete_cluster(self): def delete_cluster(self) -> TYPE_RESPONSE:
name = self._get_param("name") name = self._get_param("name")
cluster = self.eks_backend.delete_cluster(name=name) cluster = self.eks_backend.delete_cluster(name=name)
return 200, {}, json.dumps({"cluster": dict(cluster)}) return 200, {}, json.dumps({"cluster": dict(cluster)})
def delete_fargate_profile(self): def delete_fargate_profile(self) -> TYPE_RESPONSE:
cluster_name = self._get_param("name") cluster_name = self._get_param("name")
fargate_profile_name = self._get_param("fargateProfileName") fargate_profile_name = self._get_param("fargateProfileName")
@ -182,7 +184,7 @@ class EKSResponse(BaseResponse):
return 200, {}, json.dumps({"fargateProfile": dict(fargate_profile)}) return 200, {}, json.dumps({"fargateProfile": dict(fargate_profile)})
def delete_nodegroup(self): def delete_nodegroup(self) -> TYPE_RESPONSE:
cluster_name = self._get_param("name") cluster_name = self._get_param("name")
nodegroup_name = self._get_param("nodegroupName") nodegroup_name = self._get_param("nodegroupName")
@ -192,7 +194,7 @@ class EKSResponse(BaseResponse):
return 200, {}, json.dumps({"nodegroup": dict(nodegroup)}) return 200, {}, json.dumps({"nodegroup": dict(nodegroup)})
def tags(self, request, full_url, headers): def tags(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore[return]
self.setup_class(request, full_url, headers) self.setup_class(request, full_url, headers)
if request.method == "GET": if request.method == "GET":
return self.list_tags_for_resource() return self.list_tags_for_resource()
@ -201,25 +203,25 @@ class EKSResponse(BaseResponse):
if request.method == "DELETE": if request.method == "DELETE":
return self.untag_resource() return self.untag_resource()
def tag_resource(self): def tag_resource(self) -> TYPE_RESPONSE:
self.eks_backend.tag_resource( self.eks_backend.tag_resource(
self._extract_arn_from_path(), self._get_param("tags") self._extract_arn_from_path(), self._get_param("tags")
) )
return 200, {}, "" return 200, {}, ""
def untag_resource(self): def untag_resource(self) -> TYPE_RESPONSE:
self.eks_backend.untag_resource( self.eks_backend.untag_resource(
self._extract_arn_from_path(), self._get_param("tagKeys") self._extract_arn_from_path(), self._get_param("tagKeys")
) )
return 200, {}, "" return 200, {}, ""
def list_tags_for_resource(self): def list_tags_for_resource(self) -> TYPE_RESPONSE:
tags = self.eks_backend.list_tags_for_resource(self._extract_arn_from_path()) tags = self.eks_backend.list_tags_for_resource(self._extract_arn_from_path())
return 200, {}, json.dumps({"tags": tags}) return 200, {}, json.dumps({"tags": tags})
def _extract_arn_from_path(self): def _extract_arn_from_path(self) -> str:
# /tags/arn_that_may_contain_a_slash # /tags/arn_that_may_contain_a_slash
path = unquote(self.path) path = unquote(self.path)
return "/".join(path.split("/")[2:]) return "/".join(path.split("/")[2:])

View File

@ -6,7 +6,7 @@ from boto3 import Session
from moto.eks.exceptions import InvalidParameterException from moto.eks.exceptions import InvalidParameterException
def get_partition(region): def get_partition(region: str) -> str:
valid_matches = [ valid_matches = [
# (region prefix, aws partition) # (region prefix, aws partition)
("cn-", "aws-cn"), ("cn-", "aws-cn"),
@ -21,7 +21,7 @@ def get_partition(region):
return "aws" return "aws"
def method_name(use_parent=False): def method_name(use_parent: bool = False) -> str:
""" """
Returns the name of the method which called it from the stack in PascalCase. Returns the name of the method which called it from the stack in PascalCase.
If `use_parent` is True, returns the parent of the method which called it instead. If `use_parent` is True, returns the parent of the method which called it instead.
@ -37,12 +37,12 @@ def method_name(use_parent=False):
) )
def validate_role_arn(arn): def validate_role_arn(arn: str) -> None:
valid_role_arn_format = re.compile( valid_role_arn_format = re.compile(
"arn:(?P<partition>.+):iam::(?P<account_id>[0-9]{12}):role/.+" "arn:(?P<partition>.+):iam::(?P<account_id>[0-9]{12}):role/.+"
) )
match = valid_role_arn_format.match(arn) match = valid_role_arn_format.match(arn)
valid_partition = match.group("partition") in Session().get_available_partitions() valid_partition = match.group("partition") in Session().get_available_partitions() # type: ignore
if not all({arn, match, valid_partition}): if not all({arn, match, valid_partition}):
raise InvalidParameterException("Invalid Role Arn: '" + arn + "'") raise InvalidParameterException("Invalid Role Arn: '" + arn + "'") # type: ignore

View File

@ -229,7 +229,7 @@ disable = W,C,R,E
enable = anomalous-backslash-in-string, arguments-renamed, dangerous-default-value, deprecated-module, function-redefined, import-self, redefined-builtin, redefined-outer-name, reimported, pointless-statement, super-with-arguments, unused-argument, unused-import, unused-variable, useless-else-on-loop, wildcard-import enable = anomalous-backslash-in-string, arguments-renamed, dangerous-default-value, deprecated-module, function-redefined, import-self, redefined-builtin, redefined-outer-name, reimported, pointless-statement, super-with-arguments, unused-argument, unused-import, unused-variable, useless-else-on-loop, wildcard-import
[mypy] [mypy]
files= moto/a*,moto/b*,moto/c*,moto/d*,moto/ebs/,moto/ec2,moto/ec2instanceconnect,moto/ecr,moto/ecs,moto/efs,moto/es,moto/moto_api files= moto/a*,moto/b*,moto/c*,moto/d*,moto/ebs/,moto/ec2,moto/ec2instanceconnect,moto/ecr,moto/ecs,moto/efs,moto/eks,moto/es,moto/moto_api
show_column_numbers=True show_column_numbers=True
show_error_codes = True show_error_codes = True
disable_error_code=abstract disable_error_code=abstract