moto/moto/batch/models.py

1279 lines
43 KiB
Python
Raw Normal View History

2017-09-26 17:37:26 +01:00
from __future__ import unicode_literals
import boto3
import re
import requests.adapters
from itertools import cycle
import six
import datetime
import time
import uuid
import logging
import docker
import functools
import threading
import dateutil.parser
2017-09-26 17:37:26 +01:00
from moto.core import BaseBackend, BaseModel
from moto.iam import iam_backends
from moto.ec2 import ec2_backends
from moto.ecs import ecs_backends
from moto.logs import logs_backends
from .exceptions import InvalidParameterValueException, InternalFailure, ClientException
2019-10-31 08:44:26 -07:00
from .utils import (
make_arn_for_compute_env,
make_arn_for_job_queue,
make_arn_for_task_def,
lowercase_first_key,
)
from moto.ec2.exceptions import InvalidSubnetIdError
from moto.ec2.models import INSTANCE_TYPES as EC2_INSTANCE_TYPES
from moto.iam.exceptions import IAMNotFoundException
_orig_adapter_send = requests.adapters.HTTPAdapter.send
logger = logging.getLogger(__name__)
DEFAULT_ACCOUNT_ID = 123456789012
2019-10-31 08:44:26 -07:00
COMPUTE_ENVIRONMENT_NAME_REGEX = re.compile(
r"^[A-Za-z0-9][A-Za-z0-9_-]{1,126}[A-Za-z0-9]$"
)
def datetime2int(date):
return int(time.mktime(date.timetuple()))
class ComputeEnvironment(BaseModel):
2019-10-31 08:44:26 -07:00
def __init__(
self,
compute_environment_name,
_type,
state,
compute_resources,
service_role,
region_name,
):
self.name = compute_environment_name
self.env_type = _type
self.state = state
self.compute_resources = compute_resources
self.service_role = service_role
2019-10-31 08:44:26 -07:00
self.arn = make_arn_for_compute_env(
DEFAULT_ACCOUNT_ID, compute_environment_name, region_name
)
2017-09-26 17:37:26 +01:00
self.instances = []
self.ecs_arn = None
self.ecs_name = None
def add_instance(self, instance):
self.instances.append(instance)
def set_ecs(self, arn, name):
self.ecs_arn = arn
self.ecs_name = name
@property
def physical_resource_id(self):
return self.arn
@classmethod
2019-10-31 08:44:26 -07:00
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
):
backend = batch_backends[region_name]
2019-10-31 08:44:26 -07:00
properties = cloudformation_json["Properties"]
env = backend.create_compute_environment(
resource_name,
2019-10-31 08:44:26 -07:00
properties["Type"],
properties.get("State", "ENABLED"),
lowercase_first_key(properties["ComputeResources"]),
properties["ServiceRole"],
)
arn = env[1]
return backend.get_compute_environment_by_arn(arn)
2017-09-26 17:37:26 +01:00
class JobQueue(BaseModel):
2019-10-31 08:44:26 -07:00
def __init__(
self, name, priority, state, environments, env_order_json, region_name
):
"""
:param name: Job queue name
:type name: str
:param priority: Job queue priority
:type priority: int
:param state: Either ENABLED or DISABLED
:type state: str
:param environments: Compute Environments
:type environments: list of ComputeEnvironment
:param env_order_json: Compute Environments JSON for use when describing
:type env_order_json: list of dict
:param region_name: Region name
:type region_name: str
"""
self.name = name
self.priority = priority
self.state = state
self.environments = environments
self.env_order_json = env_order_json
self.arn = make_arn_for_job_queue(DEFAULT_ACCOUNT_ID, name, region_name)
2019-10-31 08:44:26 -07:00
self.status = "VALID"
self.jobs = []
def describe(self):
result = {
2019-10-31 08:44:26 -07:00
"computeEnvironmentOrder": self.env_order_json,
"jobQueueArn": self.arn,
"jobQueueName": self.name,
"priority": self.priority,
"state": self.state,
"status": self.status,
}
return result
2017-10-20 01:06:30 +01:00
@property
def physical_resource_id(self):
return self.arn
@classmethod
2019-10-31 08:44:26 -07:00
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
):
2017-10-20 01:06:30 +01:00
backend = batch_backends[region_name]
2019-10-31 08:44:26 -07:00
properties = cloudformation_json["Properties"]
2017-10-20 01:06:30 +01:00
# Need to deal with difference case from cloudformation compute_resources, e.g. instanceRole vs InstanceRole
# Hacky fix to normalise keys, is making me think I want to start spamming cAsEiNsEnSiTiVe dictionaries
2019-10-31 08:44:26 -07:00
compute_envs = [
lowercase_first_key(dict_item)
for dict_item in properties["ComputeEnvironmentOrder"]
]
2017-10-20 01:06:30 +01:00
queue = backend.create_job_queue(
queue_name=resource_name,
2019-10-31 08:44:26 -07:00
priority=properties["Priority"],
state=properties.get("State", "ENABLED"),
compute_env_order=compute_envs,
2017-10-20 01:06:30 +01:00
)
arn = queue[1]
return backend.get_job_queue_by_arn(arn)
2017-10-04 20:17:29 +01:00
class JobDefinition(BaseModel):
2019-10-31 08:44:26 -07:00
def __init__(
self,
name,
parameters,
_type,
container_properties,
region_name,
revision=0,
retry_strategy=0,
):
2017-10-04 20:17:29 +01:00
self.name = name
self.retries = retry_strategy
self.type = _type
self.revision = revision
self._region = region_name
self.container_properties = container_properties
self.arn = None
2019-10-31 08:44:26 -07:00
self.status = "INACTIVE"
2017-10-04 20:17:29 +01:00
2017-10-05 00:00:40 +01:00
if parameters is None:
parameters = {}
self.parameters = parameters
2017-10-04 20:17:29 +01:00
2017-10-05 00:00:40 +01:00
self._validate()
2017-10-04 20:17:29 +01:00
self._update_arn()
def _update_arn(self):
self.revision += 1
2019-10-31 08:44:26 -07:00
self.arn = make_arn_for_task_def(
DEFAULT_ACCOUNT_ID, self.name, self.revision, self._region
)
2017-10-04 20:17:29 +01:00
2017-10-05 00:00:40 +01:00
def _validate(self):
2019-10-31 08:44:26 -07:00
if self.type not in ("container",):
2017-10-05 00:00:40 +01:00
raise ClientException('type must be one of "container"')
# For future use when containers arnt the only thing in batch
2019-10-31 08:44:26 -07:00
if self.type != "container":
2017-10-05 00:00:40 +01:00
raise NotImplementedError()
if not isinstance(self.parameters, dict):
2019-10-31 08:44:26 -07:00
raise ClientException("parameters must be a string to string map")
2017-10-05 00:00:40 +01:00
2019-10-31 08:44:26 -07:00
if "image" not in self.container_properties:
raise ClientException("containerProperties must contain image")
2017-10-04 20:17:29 +01:00
2019-10-31 08:44:26 -07:00
if "memory" not in self.container_properties:
raise ClientException("containerProperties must contain memory")
if self.container_properties["memory"] < 4:
raise ClientException("container memory limit must be greater than 4")
2017-10-04 20:17:29 +01:00
2019-10-31 08:44:26 -07:00
if "vcpus" not in self.container_properties:
raise ClientException("containerProperties must contain vcpus")
if self.container_properties["vcpus"] < 1:
raise ClientException("container vcpus limit must be greater than 0")
2017-10-04 20:17:29 +01:00
2017-10-05 00:00:40 +01:00
def update(self, parameters, _type, container_properties, retry_strategy):
if parameters is None:
parameters = self.parameters
if _type is None:
_type = self.type
if container_properties is None:
container_properties = self.container_properties
if retry_strategy is None:
retry_strategy = self.retries
2019-10-31 08:44:26 -07:00
return JobDefinition(
self.name,
parameters,
_type,
container_properties,
region_name=self._region,
revision=self.revision,
retry_strategy=retry_strategy,
)
2017-10-05 00:00:40 +01:00
def describe(self):
result = {
2019-10-31 08:44:26 -07:00
"jobDefinitionArn": self.arn,
"jobDefinitionName": self.name,
"parameters": self.parameters,
"revision": self.revision,
"status": self.status,
"type": self.type,
2017-10-05 00:00:40 +01:00
}
if self.container_properties is not None:
2019-10-31 08:44:26 -07:00
result["containerProperties"] = self.container_properties
2017-10-05 00:00:40 +01:00
if self.retries is not None and self.retries > 0:
2019-10-31 08:44:26 -07:00
result["retryStrategy"] = {"attempts": self.retries}
2017-10-05 00:00:40 +01:00
return result
2017-10-20 19:10:31 +01:00
@property
def physical_resource_id(self):
return self.arn
@classmethod
2019-10-31 08:44:26 -07:00
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
):
2017-10-20 19:10:31 +01:00
backend = batch_backends[region_name]
2019-10-31 08:44:26 -07:00
properties = cloudformation_json["Properties"]
2017-10-20 19:10:31 +01:00
res = backend.register_job_definition(
def_name=resource_name,
2019-10-31 08:44:26 -07:00
parameters=lowercase_first_key(properties.get("Parameters", {})),
_type="container",
retry_strategy=lowercase_first_key(properties["RetryStrategy"]),
container_properties=lowercase_first_key(properties["ContainerProperties"]),
2017-10-20 19:10:31 +01:00
)
arn = res[1]
return backend.get_job_definition_by_arn(arn)
2017-10-04 20:17:29 +01:00
class Job(threading.Thread, BaseModel):
def __init__(self, name, job_def, job_queue, log_backend):
"""
Docker Job
:param name: Job Name
:param job_def: Job definition
:type: job_def: JobDefinition
:param job_queue: Job Queue
:param log_backend: Log backend
:type log_backend: moto.logs.models.LogsBackend
"""
threading.Thread.__init__(self)
self.job_name = name
self.job_id = str(uuid.uuid4())
self.job_definition = job_def
self.job_queue = job_queue
2019-11-22 09:39:07 -05:00
self.job_state = "SUBMITTED" # One of SUBMITTED | PENDING | RUNNABLE | STARTING | RUNNING | SUCCEEDED | FAILED
self.job_queue.jobs.append(self)
self.job_started_at = datetime.datetime(1970, 1, 1)
self.job_stopped_at = datetime.datetime(1970, 1, 1)
self.job_stopped = False
self.job_stopped_reason = None
self.stop = False
self.daemon = True
2019-10-31 08:44:26 -07:00
self.name = "MOTO-BATCH-" + self.job_id
self.docker_client = docker.from_env()
self._log_backend = log_backend
# Unfortunately mocking replaces this method w/o fallback enabled, so we
# need to replace it if we detect it's been mocked
if requests.adapters.HTTPAdapter.send != _orig_adapter_send:
_orig_get_adapter = self.docker_client.api.get_adapter
def replace_adapter_send(*args, **kwargs):
adapter = _orig_get_adapter(*args, **kwargs)
if isinstance(adapter, requests.adapters.HTTPAdapter):
adapter.send = functools.partial(_orig_adapter_send, adapter)
return adapter
2019-10-31 08:44:26 -07:00
self.docker_client.api.get_adapter = replace_adapter_send
def describe(self):
result = {
2019-10-31 08:44:26 -07:00
"jobDefinition": self.job_definition.arn,
"jobId": self.job_id,
"jobName": self.job_name,
"jobQueue": self.job_queue.arn,
"startedAt": datetime2int(self.job_started_at),
"status": self.job_state,
"dependsOn": [],
}
if self.job_stopped:
2019-10-31 08:44:26 -07:00
result["stoppedAt"] = datetime2int(self.job_stopped_at)
result["container"] = {}
result["container"]["command"] = [
'/bin/sh -c "for a in `seq 1 10`; do echo Hello World; sleep 1; done"'
]
result["container"]["privileged"] = False
result["container"]["readonlyRootFilesystem"] = False
result["container"]["ulimits"] = {}
result["container"]["vcpus"] = 1
result["container"]["volumes"] = ""
result["container"]["logStreamName"] = self.log_stream_name
if self.job_stopped_reason is not None:
2019-10-31 08:44:26 -07:00
result["statusReason"] = self.job_stopped_reason
return result
def run(self):
"""
Run the container.
Logic is as follows:
Generate container info (eventually from task definition)
Start container
Loop whilst not asked to stop and the container is running.
Get all logs from container between the last time I checked and now.
Convert logs into cloudwatch format
Put logs into cloudwatch
:return:
"""
try:
2019-10-31 08:44:26 -07:00
self.job_state = "PENDING"
time.sleep(1)
2019-10-31 08:44:26 -07:00
image = "alpine:latest"
cmd = '/bin/sh -c "for a in `seq 1 10`; do echo Hello World; sleep 1; done"'
2019-10-31 08:44:26 -07:00
name = "{0}-{1}".format(self.job_name, self.job_id)
2019-10-31 08:44:26 -07:00
self.job_state = "RUNNABLE"
# TODO setup ecs container instance
time.sleep(1)
2019-10-31 08:44:26 -07:00
self.job_state = "STARTING"
log_config = docker.types.LogConfig(type=docker.types.LogConfig.types.JSON)
container = self.docker_client.containers.run(
image, cmd, detach=True, name=name, log_config=log_config
)
2019-10-31 08:44:26 -07:00
self.job_state = "RUNNING"
self.job_started_at = datetime.datetime.now()
try:
# Log collection
logs_stdout = []
logs_stderr = []
container.reload()
# Dodgy hack, we can only check docker logs once a second, but we want to loop more
# so we can stop if asked to in a quick manner, should all go away if we go async
# There also be some dodgyness when sending an integer to docker logs and some
# events seem to be duplicated.
now = datetime.datetime.now()
i = 1
2019-10-31 08:44:26 -07:00
while container.status == "running" and not self.stop:
time.sleep(0.15)
if i % 10 == 0:
2019-10-31 08:44:26 -07:00
logs_stderr.extend(
container.logs(
stdout=False,
stderr=True,
timestamps=True,
since=datetime2int(now),
)
.decode()
.split("\n")
)
logs_stdout.extend(
container.logs(
stdout=True,
stderr=False,
timestamps=True,
since=datetime2int(now),
)
.decode()
.split("\n")
)
now = datetime.datetime.now()
container.reload()
i += 1
# Container should be stopped by this point... unless asked to stop
2019-10-31 08:44:26 -07:00
if container.status == "running":
container.kill()
self.job_stopped_at = datetime.datetime.now()
# Get final logs
2019-10-31 08:44:26 -07:00
logs_stderr.extend(
container.logs(
stdout=False,
stderr=True,
timestamps=True,
since=datetime2int(now),
)
.decode()
.split("\n")
)
logs_stdout.extend(
container.logs(
stdout=True,
stderr=False,
timestamps=True,
since=datetime2int(now),
)
.decode()
.split("\n")
)
2019-10-31 08:44:26 -07:00
self.job_state = "SUCCEEDED" if not self.stop else "FAILED"
# Process logs
logs_stdout = [x for x in logs_stdout if len(x) > 0]
logs_stderr = [x for x in logs_stderr if len(x) > 0]
logs = []
for line in logs_stdout + logs_stderr:
2019-10-31 08:44:26 -07:00
date, line = line.split(" ", 1)
date = dateutil.parser.parse(date)
date = int(date.timestamp())
2019-10-31 08:44:26 -07:00
logs.append({"timestamp": date, "message": line.strip()})
# Send to cloudwatch
2019-10-31 08:44:26 -07:00
log_group = "/aws/batch/job"
stream_name = "{0}/default/{1}".format(
self.job_definition.name, self.job_id
)
self.log_stream_name = stream_name
self._log_backend.ensure_log_group(log_group, None)
self._log_backend.create_log_stream(log_group, stream_name)
self._log_backend.put_log_events(log_group, stream_name, logs, None)
except Exception as err:
2019-10-31 08:44:26 -07:00
logger.error(
"Failed to run AWS Batch container {0}. Error {1}".format(
self.name, err
)
)
self.job_state = "FAILED"
container.kill()
finally:
container.remove()
except Exception as err:
2019-10-31 08:44:26 -07:00
logger.error(
"Failed to run AWS Batch container {0}. Error {1}".format(
self.name, err
)
)
self.job_state = "FAILED"
self.job_stopped = True
self.job_stopped_at = datetime.datetime.now()
def terminate(self, reason):
if not self.stop:
self.stop = True
self.job_stopped_reason = reason
2017-09-26 17:37:26 +01:00
class BatchBackend(BaseBackend):
def __init__(self, region_name=None):
super(BatchBackend, self).__init__()
self.region_name = region_name
self._compute_environments = {}
self._job_queues = {}
2017-10-04 20:17:29 +01:00
self._job_definitions = {}
self._jobs = {}
@property
def iam_backend(self):
"""
:return: IAM Backend
:rtype: moto.iam.models.IAMBackend
"""
2019-10-31 08:44:26 -07:00
return iam_backends["global"]
@property
def ec2_backend(self):
"""
:return: EC2 Backend
:rtype: moto.ec2.models.EC2Backend
"""
return ec2_backends[self.region_name]
@property
def ecs_backend(self):
"""
:return: ECS Backend
:rtype: moto.ecs.models.EC2ContainerServiceBackend
"""
return ecs_backends[self.region_name]
@property
def logs_backend(self):
"""
:return: ECS Backend
:rtype: moto.logs.models.LogsBackend
"""
return logs_backends[self.region_name]
2017-09-26 17:37:26 +01:00
def reset(self):
region_name = self.region_name
for job in self._jobs.values():
2019-10-31 08:44:26 -07:00
if job.job_state not in ("FAILED", "SUCCEEDED"):
job.stop = True
# Try to join
job.join(0.2)
2017-09-26 17:37:26 +01:00
self.__dict__ = {}
self.__init__(region_name)
def get_compute_environment_by_arn(self, arn):
return self._compute_environments.get(arn)
def get_compute_environment_by_name(self, name):
for comp_env in self._compute_environments.values():
if comp_env.name == name:
return comp_env
return None
def get_compute_environment(self, identifier):
"""
Get compute environment by name or ARN
:param identifier: Name or ARN
:type identifier: str
:return: Compute Environment or None
:rtype: ComputeEnvironment or None
"""
env = self.get_compute_environment_by_arn(identifier)
if env is None:
env = self.get_compute_environment_by_name(identifier)
return env
def get_job_queue_by_arn(self, arn):
return self._job_queues.get(arn)
def get_job_queue_by_name(self, name):
for comp_env in self._job_queues.values():
if comp_env.name == name:
return comp_env
return None
def get_job_queue(self, identifier):
"""
Get job queue by name or ARN
:param identifier: Name or ARN
:type identifier: str
:return: Job Queue or None
:rtype: JobQueue or None
"""
env = self.get_job_queue_by_arn(identifier)
if env is None:
env = self.get_job_queue_by_name(identifier)
return env
2017-10-04 20:17:29 +01:00
def get_job_definition_by_arn(self, arn):
return self._job_definitions.get(arn)
2017-10-05 00:09:10 +01:00
def get_job_definition_by_name(self, name):
latest_revision = -1
latest_job = None
for job_def in self._job_definitions.values():
if job_def.name == name and job_def.revision > latest_revision:
latest_job = job_def
latest_revision = job_def.revision
return latest_job
2017-10-04 20:17:29 +01:00
2017-10-05 00:00:40 +01:00
def get_job_definition_by_name_revision(self, name, revision):
for job_def in self._job_definitions.values():
if job_def.name == name and job_def.revision == revision:
return job_def
return None
2017-10-04 20:17:29 +01:00
def get_job_definition(self, identifier):
"""
2019-11-16 12:31:45 -08:00
Get job definitions by name or ARN
2017-10-04 20:17:29 +01:00
:param identifier: Name or ARN
:type identifier: str
2017-10-05 00:00:40 +01:00
:return: Job definition or None
:rtype: JobDefinition or None
2017-10-04 20:17:29 +01:00
"""
job_def = self.get_job_definition_by_arn(identifier)
if job_def is None:
2019-10-31 08:44:26 -07:00
if ":" in identifier:
job_def = self.get_job_definition_by_name_revision(
*identifier.split(":", 1)
)
else:
job_def = self.get_job_definition_by_name(identifier)
return job_def
2017-10-04 20:17:29 +01:00
2017-10-05 00:00:40 +01:00
def get_job_definitions(self, identifier):
"""
2019-11-16 12:31:45 -08:00
Get job definitions by name or ARN
2017-10-05 00:00:40 +01:00
:param identifier: Name or ARN
:type identifier: str
:return: Job definition or None
:rtype: list of JobDefinition
"""
result = []
env = self.get_job_definition_by_arn(identifier)
if env is not None:
result.append(env)
else:
for value in self._job_definitions.values():
if value.name == identifier:
result.append(value)
return result
def get_job_by_id(self, identifier):
"""
Get job by id
:param identifier: Job ID
:type identifier: str
:return: Job
:rtype: Job
"""
try:
return self._jobs[identifier]
except KeyError:
return None
2019-10-31 08:44:26 -07:00
def describe_compute_environments(
self, environments=None, max_results=None, next_token=None
):
envs = set()
if environments is not None:
envs = set(environments)
result = []
for arn, environment in self._compute_environments.items():
# Filter shortcut
if len(envs) > 0 and arn not in envs and environment.name not in envs:
continue
json_part = {
2019-10-31 08:44:26 -07:00
"computeEnvironmentArn": arn,
"computeEnvironmentName": environment.name,
"ecsClusterArn": environment.ecs_arn,
"serviceRole": environment.service_role,
"state": environment.state,
"type": environment.env_type,
"status": "VALID",
}
2019-10-31 08:44:26 -07:00
if environment.env_type == "MANAGED":
json_part["computeResources"] = environment.compute_resources
result.append(json_part)
return result
2019-10-31 08:44:26 -07:00
def create_compute_environment(
self, compute_environment_name, _type, state, compute_resources, service_role
):
# Validate
if COMPUTE_ENVIRONMENT_NAME_REGEX.match(compute_environment_name) is None:
2019-10-31 08:44:26 -07:00
raise InvalidParameterValueException(
"Compute environment name does not match ^[A-Za-z0-9][A-Za-z0-9_-]{1,126}[A-Za-z0-9]$"
)
if self.get_compute_environment_by_name(compute_environment_name) is not None:
2019-10-31 08:44:26 -07:00
raise InvalidParameterValueException(
"A compute environment already exists with the name {0}".format(
compute_environment_name
)
)
# Look for IAM role
try:
self.iam_backend.get_role_by_arn(service_role)
except IAMNotFoundException:
2019-10-31 08:44:26 -07:00
raise InvalidParameterValueException(
"Could not find IAM role {0}".format(service_role)
)
2019-10-31 08:44:26 -07:00
if _type not in ("MANAGED", "UNMANAGED"):
raise InvalidParameterValueException(
"type {0} must be one of MANAGED | UNMANAGED".format(service_role)
)
2019-10-31 08:44:26 -07:00
if state is not None and state not in ("ENABLED", "DISABLED"):
raise InvalidParameterValueException(
"state {0} must be one of ENABLED | DISABLED".format(state)
)
2019-10-31 08:44:26 -07:00
if compute_resources is None and _type == "MANAGED":
raise InvalidParameterValueException(
"computeResources must be specified when creating a MANAGED environment".format(
state
)
)
elif compute_resources is not None:
self._validate_compute_resources(compute_resources)
# By here, all values except SPOT ones have been validated
new_comp_env = ComputeEnvironment(
2019-10-31 08:44:26 -07:00
compute_environment_name,
_type,
state,
compute_resources,
service_role,
region_name=self.region_name,
)
self._compute_environments[new_comp_env.arn] = new_comp_env
# Ok by this point, everything is legit, so if its Managed then start some instances
2019-10-31 08:44:26 -07:00
if _type == "MANAGED":
cpus = int(
compute_resources.get("desiredvCpus", compute_resources["minvCpus"])
)
instance_types = compute_resources["instanceTypes"]
needed_instance_types = self.find_min_instances_to_meet_vcpus(
instance_types, cpus
)
# Create instances
# Will loop over and over so we get decent subnet coverage
2019-10-31 08:44:26 -07:00
subnet_cycle = cycle(compute_resources["subnets"])
for instance_type in needed_instance_types:
reservation = self.ec2_backend.add_instances(
2019-10-31 08:44:26 -07:00
image_id="ami-ecs-optimised", # Todo import AMIs
count=1,
user_data=None,
security_group_names=[],
instance_type=instance_type,
region_name=self.region_name,
subnet_id=six.next(subnet_cycle),
2019-10-31 08:44:26 -07:00
key_name=compute_resources.get("ec2KeyPair", "AWS_OWNED"),
security_group_ids=compute_resources["securityGroupIds"],
)
new_comp_env.add_instance(reservation.instances[0])
# Create ECS cluster
# Should be of format P2OnDemand_Batch_UUID
2019-10-31 08:44:26 -07:00
cluster_name = "OnDemand_Batch_" + str(uuid.uuid4())
ecs_cluster = self.ecs_backend.create_cluster(cluster_name)
new_comp_env.set_ecs(ecs_cluster.arn, cluster_name)
return compute_environment_name, new_comp_env.arn
def _validate_compute_resources(self, cr):
"""
Checks contents of sub dictionary for managed clusters
:param cr: computeResources
:type cr: dict
"""
2019-10-31 08:44:26 -07:00
for param in (
"instanceRole",
"maxvCpus",
"minvCpus",
"instanceTypes",
"securityGroupIds",
"subnets",
"type",
):
if param not in cr:
2019-10-31 08:44:26 -07:00
raise InvalidParameterValueException(
"computeResources must contain {0}".format(param)
)
if self.iam_backend.get_role_by_arn(cr["instanceRole"]) is None:
raise InvalidParameterValueException(
"could not find instanceRole {0}".format(cr["instanceRole"])
)
if cr["maxvCpus"] < 0:
raise InvalidParameterValueException("maxVCpus must be positive")
if cr["minvCpus"] < 0:
raise InvalidParameterValueException("minVCpus must be positive")
if cr["maxvCpus"] < cr["minvCpus"]:
raise InvalidParameterValueException(
"maxVCpus must be greater than minvCpus"
)
if len(cr["instanceTypes"]) == 0:
raise InvalidParameterValueException(
"At least 1 instance type must be provided"
)
for instance_type in cr["instanceTypes"]:
if instance_type == "optimal":
pass # Optimal should pick from latest of current gen
elif instance_type not in EC2_INSTANCE_TYPES:
2019-10-31 08:44:26 -07:00
raise InvalidParameterValueException(
"Instance type {0} does not exist".format(instance_type)
)
2019-10-31 08:44:26 -07:00
for sec_id in cr["securityGroupIds"]:
if self.ec2_backend.get_security_group_from_id(sec_id) is None:
2019-10-31 08:44:26 -07:00
raise InvalidParameterValueException(
"security group {0} does not exist".format(sec_id)
)
if len(cr["securityGroupIds"]) == 0:
raise InvalidParameterValueException(
"At least 1 security group must be provided"
)
2019-10-31 08:44:26 -07:00
for subnet_id in cr["subnets"]:
try:
self.ec2_backend.get_subnet(subnet_id)
except InvalidSubnetIdError:
2019-10-31 08:44:26 -07:00
raise InvalidParameterValueException(
"subnet {0} does not exist".format(subnet_id)
)
if len(cr["subnets"]) == 0:
raise InvalidParameterValueException("At least 1 subnet must be provided")
2019-10-31 08:44:26 -07:00
if cr["type"] not in ("EC2", "SPOT"):
raise InvalidParameterValueException(
"computeResources.type must be either EC2 | SPOT"
)
2019-10-31 08:44:26 -07:00
if cr["type"] == "SPOT":
raise InternalFailure("SPOT NOT SUPPORTED YET")
2017-09-26 17:37:26 +01:00
@staticmethod
def find_min_instances_to_meet_vcpus(instance_types, target):
"""
Finds the minimum needed instances to meed a vcpu target
:param instance_types: Instance types, like ['t2.medium', 't2.small']
:type instance_types: list of str
:param target: VCPU target
:type target: float
:return: List of instance types
:rtype: list of str
"""
# vcpus = [ (vcpus, instance_type), (vcpus, instance_type), ... ]
instance_vcpus = []
instances = []
for instance_type in instance_types:
2019-10-31 08:44:26 -07:00
if instance_type == "optimal":
instance_type = "m4.4xlarge"
instance_vcpus.append(
2019-10-31 08:44:26 -07:00
(EC2_INSTANCE_TYPES[instance_type]["vcpus"], instance_type)
)
instance_vcpus = sorted(instance_vcpus, key=lambda item: item[0], reverse=True)
# Loop through,
# if biggest instance type smaller than target, and len(instance_types)> 1, then use biggest type
# if biggest instance type bigger than target, and len(instance_types)> 1, then remove it and move on
# if biggest instance type bigger than target and len(instan_types) == 1 then add instance and finish
# if biggest instance type smaller than target and len(instan_types) == 1 then loop adding instances until target == 0
# ^^ boils down to keep adding last till target vcpus is negative
# #Algorithm ;-) ... Could probably be done better with some quality lambdas
while target > 0:
current_vcpu, current_instance = instance_vcpus[0]
if len(instance_vcpus) > 1:
if current_vcpu <= target:
target -= current_vcpu
instances.append(current_instance)
else:
# try next biggest instance
instance_vcpus.pop(0)
else:
# Were on the last instance
target -= current_vcpu
instances.append(current_instance)
return instances
def delete_compute_environment(self, compute_environment_name):
if compute_environment_name is None:
2019-10-31 08:44:26 -07:00
raise InvalidParameterValueException("Missing computeEnvironment parameter")
compute_env = self.get_compute_environment(compute_environment_name)
if compute_env is not None:
# Pop ComputeEnvironment
self._compute_environments.pop(compute_env.arn)
# Delete ECS cluster
self.ecs_backend.delete_cluster(compute_env.ecs_name)
2019-10-31 08:44:26 -07:00
if compute_env.env_type == "MANAGED":
2019-11-16 12:31:45 -08:00
# Delete compute environment
instance_ids = [instance.id for instance in compute_env.instances]
self.ec2_backend.terminate_instances(instance_ids)
2019-10-31 08:44:26 -07:00
def update_compute_environment(
self, compute_environment_name, state, compute_resources, service_role
):
# Validate
compute_env = self.get_compute_environment(compute_environment_name)
if compute_env is None:
2019-10-31 08:44:26 -07:00
raise ClientException("Compute environment {0} does not exist")
# Look for IAM role
if service_role is not None:
try:
role = self.iam_backend.get_role_by_arn(service_role)
except IAMNotFoundException:
2019-10-31 08:44:26 -07:00
raise InvalidParameterValueException(
"Could not find IAM role {0}".format(service_role)
)
compute_env.service_role = role
if state is not None:
2019-10-31 08:44:26 -07:00
if state not in ("ENABLED", "DISABLED"):
raise InvalidParameterValueException(
"state {0} must be one of ENABLED | DISABLED".format(state)
)
compute_env.state = state
if compute_resources is not None:
# TODO Implement resizing of instances based on changing vCpus
# compute_resources CAN contain desiredvCpus, maxvCpus, minvCpus, and can contain none of them.
pass
return compute_env.name, compute_env.arn
def create_job_queue(self, queue_name, priority, state, compute_env_order):
"""
Create a job queue
:param queue_name: Queue name
:type queue_name: str
:param priority: Queue priority
:type priority: int
:param state: Queue state
:type state: string
:param compute_env_order: Compute environment list
:type compute_env_order: list of dict
:return: Tuple of Name, ARN
:rtype: tuple of str
"""
2019-10-31 08:44:26 -07:00
for variable, var_name in (
(queue_name, "jobQueueName"),
(priority, "priority"),
(state, "state"),
(compute_env_order, "computeEnvironmentOrder"),
):
if variable is None:
2019-10-31 08:44:26 -07:00
raise ClientException("{0} must be provided".format(var_name))
2019-10-31 08:44:26 -07:00
if state not in ("ENABLED", "DISABLED"):
raise ClientException(
"state {0} must be one of ENABLED | DISABLED".format(state)
)
if self.get_job_queue_by_name(queue_name) is not None:
2019-10-31 08:44:26 -07:00
raise ClientException("Job queue {0} already exists".format(queue_name))
if len(compute_env_order) == 0:
2019-10-31 08:44:26 -07:00
raise ClientException("At least 1 compute environment must be provided")
try:
# orders and extracts computeEnvironment names
2019-10-31 08:44:26 -07:00
ordered_compute_environments = [
item["computeEnvironment"]
for item in sorted(compute_env_order, key=lambda x: x["order"])
]
env_objects = []
# Check each ARN exists, then make a list of compute env's
for arn in ordered_compute_environments:
env = self.get_compute_environment_by_arn(arn)
if env is None:
2019-10-31 08:44:26 -07:00
raise ClientException(
"Compute environment {0} does not exist".format(arn)
)
env_objects.append(env)
except Exception:
2019-10-31 08:44:26 -07:00
raise ClientException("computeEnvironmentOrder is malformed")
# Create new Job Queue
2019-10-31 08:44:26 -07:00
queue = JobQueue(
queue_name,
priority,
state,
env_objects,
compute_env_order,
self.region_name,
)
self._job_queues[queue.arn] = queue
return queue_name, queue.arn
def describe_job_queues(self, job_queues=None, max_results=None, next_token=None):
envs = set()
if job_queues is not None:
envs = set(job_queues)
result = []
for arn, job_queue in self._job_queues.items():
# Filter shortcut
if len(envs) > 0 and arn not in envs and job_queue.name not in envs:
continue
result.append(job_queue.describe())
return result
def update_job_queue(self, queue_name, priority, state, compute_env_order):
"""
Update a job queue
:param queue_name: Queue name
:type queue_name: str
:param priority: Queue priority
:type priority: int
:param state: Queue state
:type state: string
:param compute_env_order: Compute environment list
:type compute_env_order: list of dict
:return: Tuple of Name, ARN
:rtype: tuple of str
"""
if queue_name is None:
2019-10-31 08:44:26 -07:00
raise ClientException("jobQueueName must be provided")
job_queue = self.get_job_queue(queue_name)
if job_queue is None:
2019-10-31 08:44:26 -07:00
raise ClientException("Job queue {0} does not exist".format(queue_name))
if state is not None:
2019-10-31 08:44:26 -07:00
if state not in ("ENABLED", "DISABLED"):
raise ClientException(
"state {0} must be one of ENABLED | DISABLED".format(state)
)
job_queue.state = state
if compute_env_order is not None:
if len(compute_env_order) == 0:
2019-10-31 08:44:26 -07:00
raise ClientException("At least 1 compute environment must be provided")
try:
# orders and extracts computeEnvironment names
2019-10-31 08:44:26 -07:00
ordered_compute_environments = [
item["computeEnvironment"]
for item in sorted(compute_env_order, key=lambda x: x["order"])
]
env_objects = []
# Check each ARN exists, then make a list of compute env's
for arn in ordered_compute_environments:
env = self.get_compute_environment_by_arn(arn)
if env is None:
2019-10-31 08:44:26 -07:00
raise ClientException(
"Compute environment {0} does not exist".format(arn)
)
env_objects.append(env)
except Exception:
2019-10-31 08:44:26 -07:00
raise ClientException("computeEnvironmentOrder is malformed")
job_queue.env_order_json = compute_env_order
job_queue.environments = env_objects
if priority is not None:
job_queue.priority = priority
return queue_name, job_queue.arn
def delete_job_queue(self, queue_name):
job_queue = self.get_job_queue(queue_name)
if job_queue is not None:
del self._job_queues[job_queue.arn]
2019-10-31 08:44:26 -07:00
def register_job_definition(
self, def_name, parameters, _type, retry_strategy, container_properties
):
2017-10-04 20:17:29 +01:00
if def_name is None:
2019-10-31 08:44:26 -07:00
raise ClientException("jobDefinitionName must be provided")
2017-10-04 20:17:29 +01:00
2017-10-05 00:00:40 +01:00
job_def = self.get_job_definition_by_name(def_name)
2017-10-04 20:17:29 +01:00
if retry_strategy is not None:
try:
2019-10-31 08:44:26 -07:00
retry_strategy = retry_strategy["attempts"]
2017-10-04 20:17:29 +01:00
except Exception:
2019-10-31 08:44:26 -07:00
raise ClientException("retryStrategy is malformed")
2017-10-04 20:17:29 +01:00
2017-10-05 00:00:40 +01:00
if job_def is None:
2019-10-31 08:44:26 -07:00
job_def = JobDefinition(
def_name,
parameters,
_type,
container_properties,
region_name=self.region_name,
retry_strategy=retry_strategy,
)
2017-10-05 00:00:40 +01:00
else:
# Make new jobdef
2019-10-31 08:44:26 -07:00
job_def = job_def.update(
parameters, _type, container_properties, retry_strategy
)
2017-10-05 00:00:40 +01:00
2017-10-04 20:17:29 +01:00
self._job_definitions[job_def.arn] = job_def
return def_name, job_def.arn, job_def.revision
2017-10-05 00:00:40 +01:00
def deregister_job_definition(self, def_name):
job_def = self.get_job_definition_by_arn(def_name)
2019-10-31 08:44:26 -07:00
if job_def is None and ":" in def_name:
name, revision = def_name.split(":", 1)
2017-10-05 00:00:40 +01:00
job_def = self.get_job_definition_by_name_revision(name, revision)
if job_def is not None:
del self._job_definitions[job_def.arn]
2019-10-31 08:44:26 -07:00
def describe_job_definitions(
self,
job_def_name=None,
job_def_list=None,
status=None,
max_results=None,
next_token=None,
):
2017-10-05 00:00:40 +01:00
jobs = []
# As a job name can reference multiple revisions, we get a list of them
if job_def_name is not None:
job_def = self.get_job_definitions(job_def_name)
if job_def is not None:
jobs.extend(job_def)
elif job_def_list is not None:
for job in job_def_list:
job_def = self.get_job_definitions(job)
if job_def is not None:
jobs.extend(job_def)
else:
jobs.extend(self._job_definitions.values())
# Got all the job defs were after, filter then by status
if status is not None:
return [job for job in jobs if job.status == status]
return jobs
2019-10-31 08:44:26 -07:00
def submit_job(
self,
job_name,
job_def_id,
job_queue,
parameters=None,
retries=None,
depends_on=None,
container_overrides=None,
):
2019-11-16 12:31:45 -08:00
# TODO parameters, retries (which is a dict raw from request), job dependencies and container overrides are ignored for now
# Look for job definition
job_def = self.get_job_definition(job_def_id)
if job_def is None:
2019-10-31 08:44:26 -07:00
raise ClientException(
"Job definition {0} does not exist".format(job_def_id)
)
queue = self.get_job_queue(job_queue)
if queue is None:
2019-10-31 08:44:26 -07:00
raise ClientException("Job queue {0} does not exist".format(job_queue))
job = Job(job_name, job_def, queue, log_backend=self.logs_backend)
self._jobs[job.job_id] = job
# Here comes the fun
job.start()
return job_name, job.job_id
def describe_jobs(self, jobs):
job_filter = set()
if jobs is not None:
job_filter = set(jobs)
result = []
for key, job in self._jobs.items():
if len(job_filter) > 0 and key not in job_filter:
continue
result.append(job.describe())
return result
def list_jobs(self, job_queue, job_status=None, max_results=None, next_token=None):
jobs = []
job_queue = self.get_job_queue(job_queue)
if job_queue is None:
2019-10-31 08:44:26 -07:00
raise ClientException("Job queue {0} does not exist".format(job_queue))
if job_status is not None and job_status not in (
"SUBMITTED",
"PENDING",
"RUNNABLE",
"STARTING",
"RUNNING",
"SUCCEEDED",
"FAILED",
):
raise ClientException(
"Job status is not one of SUBMITTED | PENDING | RUNNABLE | STARTING | RUNNING | SUCCEEDED | FAILED"
)
for job in job_queue.jobs:
if job_status is not None and job.job_state != job_status:
continue
jobs.append(job)
return jobs
def terminate_job(self, job_id, reason):
if job_id is None:
2019-10-31 08:44:26 -07:00
raise ClientException("Job ID does not exist")
if reason is None:
2019-10-31 08:44:26 -07:00
raise ClientException("Reason does not exist")
job = self.get_job_by_id(job_id)
if job is None:
2019-10-31 08:44:26 -07:00
raise ClientException("Job not found")
job.terminate(reason)
2017-09-26 17:37:26 +01:00
available_regions = boto3.session.Session().get_available_regions("batch")
2019-10-31 08:44:26 -07:00
batch_backends = {
region: BatchBackend(region_name=region) for region in available_regions
}