Techdebt: MyPy ECS (#5944)

This commit is contained in:
Bert Blommers 2023-02-19 10:20:28 -01:00 committed by GitHub
parent 1c8511656e
commit efb13a17a8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 496 additions and 412 deletions

View File

@ -4,7 +4,7 @@ from moto.core.exceptions import RESTError, JsonRESTError
class ServiceNotFoundException(RESTError):
code = 400
def __init__(self):
def __init__(self) -> None:
super().__init__(
error_type="ServiceNotFoundException", message="Service not found."
)
@ -13,7 +13,7 @@ class ServiceNotFoundException(RESTError):
class TaskDefinitionNotFoundException(JsonRESTError):
code = 400
def __init__(self):
def __init__(self) -> None:
super().__init__(
error_type="ClientException",
message="Unable to describe task definition.",
@ -23,14 +23,14 @@ class TaskDefinitionNotFoundException(JsonRESTError):
class RevisionNotFoundException(JsonRESTError):
code = 400
def __init__(self):
def __init__(self) -> None:
super().__init__(error_type="ClientException", message="Revision is missing.")
class TaskSetNotFoundException(JsonRESTError):
code = 400
def __init__(self):
def __init__(self) -> None:
super().__init__(
error_type="ClientException",
message="The specified task set does not exist.",
@ -40,7 +40,7 @@ class TaskSetNotFoundException(JsonRESTError):
class ClusterNotFoundException(JsonRESTError):
code = 400
def __init__(self):
def __init__(self) -> None:
super().__init__(
error_type="ClusterNotFoundException", message="Cluster not found."
)
@ -49,19 +49,19 @@ class ClusterNotFoundException(JsonRESTError):
class EcsClientException(JsonRESTError):
code = 400
def __init__(self, message):
def __init__(self, message: str):
super().__init__(error_type="ClientException", message=message)
class InvalidParameterException(JsonRESTError):
code = 400
def __init__(self, message):
def __init__(self, message: str):
super().__init__(error_type="InvalidParameterException", message=message)
class UnknownAccountSettingException(InvalidParameterException):
def __init__(self):
def __init__(self) -> None:
super().__init__(
"unknown should be one of [serviceLongArnFormat,taskLongArnFormat,containerInstanceLongArnFormat,containerLongArnFormat,awsvpcTrunking,containerInsights,dualStackIPv6]"
)

File diff suppressed because it is too large Load Diff

View File

@ -1,35 +1,26 @@
import json
from typing import Any, Dict
from moto.core.responses import BaseResponse
from .models import ecs_backends, EC2ContainerServiceBackend
class EC2ContainerServiceResponse(BaseResponse):
def __init__(self):
def __init__(self) -> None:
super().__init__(service_name="ecs")
@property
def ecs_backend(self) -> EC2ContainerServiceBackend:
return ecs_backends[self.current_account][self.region]
@property
def request_params(self):
try:
return json.loads(self.body)
except ValueError:
return {}
def _get_param(self, param_name, if_none=None):
return self.request_params.get(param_name, if_none)
def create_capacity_provider(self):
def create_capacity_provider(self) -> str:
name = self._get_param("name")
asg_provider = self._get_param("autoScalingGroupProvider")
tags = self._get_param("tags")
provider = self.ecs_backend.create_capacity_provider(name, asg_provider, tags)
return json.dumps({"capacityProvider": provider.response_object})
def create_cluster(self):
def create_cluster(self) -> str:
cluster_name = self._get_param("clusterName")
tags = self._get_param("tags")
settings = self._get_param("settings")
@ -50,23 +41,18 @@ class EC2ContainerServiceResponse(BaseResponse):
)
return json.dumps({"cluster": cluster.response_object})
def list_clusters(self):
def list_clusters(self) -> str:
cluster_arns = self.ecs_backend.list_clusters()
return json.dumps(
{
"clusterArns": cluster_arns
# 'nextToken': str(uuid.uuid4())
}
)
return json.dumps({"clusterArns": cluster_arns})
def update_cluster(self):
def update_cluster(self) -> str:
cluster_name = self._get_param("cluster")
settings = self._get_param("settings")
configuration = self._get_param("configuration")
cluster = self.ecs_backend.update_cluster(cluster_name, settings, configuration)
return json.dumps({"cluster": cluster.response_object})
def put_cluster_capacity_providers(self):
def put_cluster_capacity_providers(self) -> str:
cluster_name = self._get_param("cluster")
capacity_providers = self._get_param("capacityProviders")
default_capacity_provider_strategy = self._get_param(
@ -77,18 +63,18 @@ class EC2ContainerServiceResponse(BaseResponse):
)
return json.dumps({"cluster": cluster.response_object})
def delete_capacity_provider(self):
def delete_capacity_provider(self) -> str:
name = self._get_param("capacityProvider")
provider = self.ecs_backend.delete_capacity_provider(name)
return json.dumps({"capacityProvider": provider.response_object})
def update_capacity_provider(self):
def update_capacity_provider(self) -> str:
name = self._get_param("name")
asg_provider = self._get_param("autoScalingGroupProvider")
provider = self.ecs_backend.update_capacity_provider(name, asg_provider)
return json.dumps({"capacityProvider": provider.response_object})
def describe_capacity_providers(self):
def describe_capacity_providers(self) -> str:
names = self._get_param("capacityProviders")
providers, failures = self.ecs_backend.describe_capacity_providers(names)
return json.dumps(
@ -98,7 +84,7 @@ class EC2ContainerServiceResponse(BaseResponse):
}
)
def describe_clusters(self):
def describe_clusters(self) -> str:
names = self._get_param("clusters")
include = self._get_param("include")
clusters, failures = self.ecs_backend.describe_clusters(names, include)
@ -109,12 +95,12 @@ class EC2ContainerServiceResponse(BaseResponse):
}
)
def delete_cluster(self):
def delete_cluster(self) -> str:
cluster_str = self._get_param("cluster")
cluster = self.ecs_backend.delete_cluster(cluster_str)
return json.dumps({"cluster": cluster.response_object})
def register_task_definition(self):
def register_task_definition(self) -> str:
family = self._get_param("family")
container_definitions = self._get_param("containerDefinitions")
volumes = self._get_param("volumes")
@ -154,7 +140,7 @@ class EC2ContainerServiceResponse(BaseResponse):
)
return json.dumps({"taskDefinition": task_definition.response_object})
def list_task_definitions(self):
def list_task_definitions(self) -> str:
family_prefix = self._get_param("familyPrefix")
task_definition_arns = self.ecs_backend.list_task_definitions(family_prefix)
return json.dumps(
@ -164,22 +150,22 @@ class EC2ContainerServiceResponse(BaseResponse):
}
)
def describe_task_definition(self):
def describe_task_definition(self) -> str:
task_definition_str = self._get_param("taskDefinition")
data = self.ecs_backend.describe_task_definition(task_definition_str)
resp = {"taskDefinition": data.response_object, "failures": []}
resp: Dict[str, Any] = {"taskDefinition": data.response_object, "failures": []}
if "TAGS" in self._get_param("include", []):
resp["tags"] = self.ecs_backend.list_tags_for_resource(data.arn)
return json.dumps(resp)
def deregister_task_definition(self):
def deregister_task_definition(self) -> str:
task_definition_str = self._get_param("taskDefinition")
task_definition = self.ecs_backend.deregister_task_definition(
task_definition_str
)
return json.dumps({"taskDefinition": task_definition.response_object})
def run_task(self):
def run_task(self) -> str:
cluster_str = self._get_param("cluster", "default")
overrides = self._get_param("overrides")
task_definition_str = self._get_param("taskDefinition")
@ -202,7 +188,7 @@ class EC2ContainerServiceResponse(BaseResponse):
{"tasks": [task.response_object for task in tasks], "failures": []}
)
def describe_tasks(self):
def describe_tasks(self) -> str:
cluster = self._get_param("cluster", "default")
tasks = self._get_param("tasks")
include = self._get_param("include")
@ -211,7 +197,7 @@ class EC2ContainerServiceResponse(BaseResponse):
{"tasks": [task.response_object for task in data], "failures": []}
)
def start_task(self):
def start_task(self) -> str:
cluster_str = self._get_param("cluster", "default")
overrides = self._get_param("overrides")
task_definition_str = self._get_param("taskDefinition")
@ -230,7 +216,7 @@ class EC2ContainerServiceResponse(BaseResponse):
{"tasks": [task.response_object for task in tasks], "failures": []}
)
def list_tasks(self):
def list_tasks(self) -> str:
cluster_str = self._get_param("cluster", "default")
container_instance = self._get_param("containerInstance")
family = self._get_param("family")
@ -247,14 +233,14 @@ class EC2ContainerServiceResponse(BaseResponse):
)
return json.dumps({"taskArns": task_arns})
def stop_task(self):
def stop_task(self) -> str:
cluster_str = self._get_param("cluster", "default")
task = self._get_param("task")
reason = self._get_param("reason")
task = self.ecs_backend.stop_task(cluster_str, task, reason)
return json.dumps({"task": task.response_object})
def create_service(self):
def create_service(self) -> str:
cluster_str = self._get_param("cluster", "default")
service_name = self._get_param("serviceName")
task_definition_str = self._get_param("taskDefinition")
@ -281,22 +267,16 @@ class EC2ContainerServiceResponse(BaseResponse):
)
return json.dumps({"service": service.response_object})
def list_services(self):
def list_services(self) -> str:
cluster_str = self._get_param("cluster", "default")
scheduling_strategy = self._get_param("schedulingStrategy")
launch_type = self._get_param("launchType")
service_arns = self.ecs_backend.list_services(
cluster_str, scheduling_strategy, launch_type=launch_type
)
return json.dumps(
{
"serviceArns": service_arns
# ,
# 'nextToken': str(uuid.uuid4())
}
)
return json.dumps({"serviceArns": service_arns})
def describe_services(self):
def describe_services(self) -> str:
cluster_str = self._get_param("cluster", "default")
service_names = self._get_param("services")
services, failures = self.ecs_backend.describe_services(
@ -313,7 +293,7 @@ class EC2ContainerServiceResponse(BaseResponse):
)
return json.dumps(resp)
def update_service(self):
def update_service(self) -> str:
cluster_str = self._get_param("cluster", "default")
service_name = self._get_param("service")
task_definition = self._get_param("taskDefinition")
@ -323,14 +303,14 @@ class EC2ContainerServiceResponse(BaseResponse):
)
return json.dumps({"service": service.response_object})
def delete_service(self):
def delete_service(self) -> str:
service_name = self._get_param("service")
cluster_name = self._get_param("cluster", "default")
force = self._get_param("force", False)
service = self.ecs_backend.delete_service(cluster_name, service_name, force)
return json.dumps({"service": service.response_object})
def register_container_instance(self):
def register_container_instance(self) -> str:
cluster_str = self._get_param("cluster", "default")
instance_identity_document_str = self._get_param("instanceIdentityDocument")
instance_identity_document = json.loads(instance_identity_document_str)
@ -340,7 +320,7 @@ class EC2ContainerServiceResponse(BaseResponse):
)
return json.dumps({"containerInstance": container_instance.response_object})
def deregister_container_instance(self):
def deregister_container_instance(self) -> str:
cluster_str = self._get_param("cluster", "default")
container_instance_str = self._get_param("containerInstance")
force = self._get_param("force")
@ -349,12 +329,12 @@ class EC2ContainerServiceResponse(BaseResponse):
)
return json.dumps({"containerInstance": container_instance.response_object})
def list_container_instances(self):
def list_container_instances(self) -> str:
cluster_str = self._get_param("cluster", "default")
container_instance_arns = self.ecs_backend.list_container_instances(cluster_str)
return json.dumps({"containerInstanceArns": container_instance_arns})
def describe_container_instances(self):
def describe_container_instances(self) -> str:
cluster_str = self._get_param("cluster", "default")
list_container_instance_arns = self._get_param("containerInstances")
container_instances, failures = self.ecs_backend.describe_container_instances(
@ -369,7 +349,7 @@ class EC2ContainerServiceResponse(BaseResponse):
}
)
def update_container_instances_state(self):
def update_container_instances_state(self) -> str:
cluster_str = self._get_param("cluster", "default")
list_container_instance_arns = self._get_param("containerInstances")
status_str = self._get_param("status")
@ -388,7 +368,7 @@ class EC2ContainerServiceResponse(BaseResponse):
}
)
def put_attributes(self):
def put_attributes(self) -> str:
cluster_name = self._get_param("cluster")
attributes = self._get_param("attributes")
@ -396,7 +376,7 @@ class EC2ContainerServiceResponse(BaseResponse):
return json.dumps({"attributes": attributes})
def list_attributes(self):
def list_attributes(self) -> str:
cluster_name = self._get_param("cluster")
attr_name = self._get_param("attributeName")
attr_value = self._get_param("attributeValue")
@ -416,7 +396,7 @@ class EC2ContainerServiceResponse(BaseResponse):
return json.dumps({"attributes": formatted_results})
def delete_attributes(self):
def delete_attributes(self) -> str:
cluster_name = self._get_param("cluster", "default")
attributes = self._get_param("attributes")
@ -424,7 +404,7 @@ class EC2ContainerServiceResponse(BaseResponse):
return json.dumps({"attributes": attributes})
def discover_poll_endpoint(self):
def discover_poll_endpoint(self) -> str:
# Here are the arguments, this api is used by the ecs client so obviously no decent
# documentation. Hence I've responded with valid but useless data
# cluster_name = self._get_param('cluster')
@ -433,30 +413,30 @@ class EC2ContainerServiceResponse(BaseResponse):
{"endpoint": "http://localhost", "telemetryEndpoint": "http://localhost"}
)
def list_task_definition_families(self):
def list_task_definition_families(self) -> str:
family_prefix = self._get_param("familyPrefix")
results = self.ecs_backend.list_task_definition_families(family_prefix)
return json.dumps({"families": list(results)})
def list_tags_for_resource(self):
def list_tags_for_resource(self) -> str:
resource_arn = self._get_param("resourceArn")
tags = self.ecs_backend.list_tags_for_resource(resource_arn)
return json.dumps({"tags": tags})
def tag_resource(self):
def tag_resource(self) -> str:
resource_arn = self._get_param("resourceArn")
tags = self._get_param("tags")
self.ecs_backend.tag_resource(resource_arn, tags)
return json.dumps({})
def untag_resource(self):
def untag_resource(self) -> str:
resource_arn = self._get_param("resourceArn")
tag_keys = self._get_param("tagKeys")
self.ecs_backend.untag_resource(resource_arn, tag_keys)
return json.dumps({})
def create_task_set(self):
def create_task_set(self) -> str:
service_str = self._get_param("service")
cluster_str = self._get_param("cluster", "default")
task_definition = self._get_param("taskDefinition")
@ -487,13 +467,13 @@ class EC2ContainerServiceResponse(BaseResponse):
)
return json.dumps({"taskSet": task_set.response_object})
def describe_task_sets(self):
def describe_task_sets(self) -> str:
cluster_str = self._get_param("cluster", "default")
service_str = self._get_param("service")
task_sets = self._get_param("taskSets")
include = self._get_param("include", [])
task_set_objs = self.ecs_backend.describe_task_sets(
cluster_str, service_str, task_sets, include
cluster_str, service_str, task_sets
)
response_objs = [t.response_object for t in task_set_objs]
@ -502,14 +482,14 @@ class EC2ContainerServiceResponse(BaseResponse):
del ro["tags"]
return json.dumps({"taskSets": response_objs})
def delete_task_set(self):
def delete_task_set(self) -> str:
cluster_str = self._get_param("cluster")
service_str = self._get_param("service")
task_set = self._get_param("taskSet")
task_set = self.ecs_backend.delete_task_set(cluster_str, service_str, task_set)
return json.dumps({"taskSet": task_set.response_object})
def update_task_set(self):
def update_task_set(self) -> str:
cluster_str = self._get_param("cluster", "default")
service_str = self._get_param("service")
task_set = self._get_param("taskSet")
@ -520,7 +500,7 @@ class EC2ContainerServiceResponse(BaseResponse):
)
return json.dumps({"taskSet": task_set.response_object})
def update_service_primary_task_set(self):
def update_service_primary_task_set(self) -> str:
cluster_str = self._get_param("cluster", "default")
service_str = self._get_param("service")
primary_task_set = self._get_param("primaryTaskSet")
@ -530,19 +510,19 @@ class EC2ContainerServiceResponse(BaseResponse):
)
return json.dumps({"taskSet": task_set.response_object})
def put_account_setting(self):
def put_account_setting(self) -> str:
name = self._get_param("name")
value = self._get_param("value")
account_setting = self.ecs_backend.put_account_setting(name, value)
return json.dumps({"setting": account_setting.response_object})
def list_account_settings(self):
def list_account_settings(self) -> str:
name = self._get_param("name")
value = self._get_param("value")
account_settings = self.ecs_backend.list_account_settings(name, value)
return json.dumps({"settings": [s.response_object for s in account_settings]})
def delete_account_setting(self):
def delete_account_setting(self) -> str:
name = self._get_param("name")
self.ecs_backend.delete_account_setting(name)
return "{}"

View File

@ -60,7 +60,7 @@ def get_s3_default_key_buffer_size():
)
def ecs_new_arn_format():
def ecs_new_arn_format() -> bool:
# True by default - only the value 'false' will return false
return os.environ.get("MOTO_ECS_NEW_ARN", "true").lower() != "false"

View File

@ -229,7 +229,7 @@ disable = W,C,R,E
enable = anomalous-backslash-in-string, arguments-renamed, dangerous-default-value, deprecated-module, function-redefined, import-self, redefined-builtin, redefined-outer-name, reimported, pointless-statement, super-with-arguments, unused-argument, unused-import, unused-variable, useless-else-on-loop, wildcard-import
[mypy]
files= moto/a*,moto/b*,moto/c*,moto/d*,moto/ebs/,moto/ec2,moto/ec2instanceconnect,moto/ecr,moto/es,moto/moto_api
files= moto/a*,moto/b*,moto/c*,moto/d*,moto/ebs/,moto/ec2,moto/ec2instanceconnect,moto/ecr,moto/ecs,moto/es,moto/moto_api
show_column_numbers=True
show_error_codes = True
disable_error_code=abstract