Merge pull request #2636 from kislyuk/akislyuk-pass-container-params

Batch: derive and pass container params to Docker
This commit is contained in:
Mike Grima 2019-12-14 13:45:10 -08:00 committed by GitHub
commit d0e555df41
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -285,7 +285,7 @@ class JobDefinition(BaseModel):
class Job(threading.Thread, BaseModel):
def __init__(self, name, job_def, job_queue, log_backend):
def __init__(self, name, job_def, job_queue, log_backend, container_overrides):
"""
Docker Job
@ -301,6 +301,7 @@ class Job(threading.Thread, BaseModel):
self.job_name = name
self.job_id = str(uuid.uuid4())
self.job_definition = job_def
self.container_overrides = container_overrides
self.job_queue = job_queue
self.job_state = "SUBMITTED" # One of SUBMITTED | PENDING | RUNNABLE | STARTING | RUNNING | SUCCEEDED | FAILED
self.job_queue.jobs.append(self)
@ -357,6 +358,11 @@ class Job(threading.Thread, BaseModel):
result["statusReason"] = self.job_stopped_reason
return result
def _get_container_property(self, p, default):
return self.container_overrides.get(
p, self.job_definition.container_properties.get(p, default)
)
def run(self):
"""
Run the container.
@ -375,8 +381,33 @@ class Job(threading.Thread, BaseModel):
self.job_state = "PENDING"
time.sleep(1)
image = "alpine:latest"
cmd = '/bin/sh -c "for a in `seq 1 10`; do echo Hello World; sleep 1; done"'
image = self.job_definition.container_properties.get(
"image", "alpine:latest"
)
privileged = self.job_definition.container_properties.get(
"privileged", False
)
cmd = self._get_container_property(
"command",
'/bin/sh -c "for a in `seq 1 10`; do echo Hello World; sleep 1; done"',
)
environment = {
e["name"]: e["value"]
for e in self._get_container_property("environment", [])
}
volumes = {
v["name"]: v["host"]
for v in self._get_container_property("volumes", [])
}
mounts = [
docker.types.Mount(
m["containerPath"],
volumes[m["sourceVolume"]]["sourcePath"],
type="bind",
read_only=m["readOnly"],
)
for m in self._get_container_property("mountPoints", [])
]
name = "{0}-{1}".format(self.job_name, self.job_id)
self.job_state = "RUNNABLE"
@ -386,7 +417,14 @@ class Job(threading.Thread, BaseModel):
self.job_state = "STARTING"
log_config = docker.types.LogConfig(type=docker.types.LogConfig.types.JSON)
container = self.docker_client.containers.run(
image, cmd, detach=True, name=name, log_config=log_config
image,
cmd,
detach=True,
name=name,
log_config=log_config,
environment=environment,
mounts=mounts,
privileged=privileged,
)
self.job_state = "RUNNING"
self.job_started_at = datetime.datetime.now()
@ -1209,7 +1247,13 @@ class BatchBackend(BaseBackend):
if queue is None:
raise ClientException("Job queue {0} does not exist".format(job_queue))
job = Job(job_name, job_def, queue, log_backend=self.logs_backend)
job = Job(
job_name,
job_def,
queue,
log_backend=self.logs_backend,
container_overrides=container_overrides,
)
self._jobs[job.job_id] = job
# Here comes the fun