Feature: Batch: cancel_job (#3769)

This commit is contained in:
Bert Blommers 2021-08-22 12:29:23 +01:00 committed by GitHub
parent 5c24071f43
commit 914d07027f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 220 additions and 147 deletions

View File

@ -1029,7 +1029,7 @@
<details>
<summary>78% implemented</summary>
- [ ] cancel_job
- [X] cancel_job
- [X] create_compute_environment
- [X] create_job_queue
- [X] delete_compute_environment

View File

@ -460,11 +460,12 @@ class Job(threading.Thread, BaseModel, DockerModel):
# TODO setup ecs container instance
self.job_started_at = datetime.datetime.now()
self.job_state = "STARTING"
log_config = docker.types.LogConfig(type=docker.types.LogConfig.types.JSON)
image_repository, image_tag = parse_image_ref(image)
# avoid explicit pulling here, to allow using cached images
# self.docker_client.images.pull(image_repository, image_tag)
self.job_state = "STARTING"
container = self.docker_client.containers.run(
image,
cmd,
@ -1357,6 +1358,12 @@ class BatchBackend(BaseBackend):
return jobs
def cancel_job(self, job_id, reason):
job = self.get_job_by_id(job_id)
if job.job_state in ["SUBMITTED", "PENDING", "RUNNABLE"]:
job.terminate(reason)
# No-Op for jobs that have already started - user has to explicitly terminate those
def terminate_job(self, job_id, reason):
if job_id is None:
raise ClientException("Job ID does not exist")

View File

@ -292,7 +292,9 @@ class BatchResponse(BaseResponse):
return ""
# CancelJob
def canceljob(
self,
): # Theres some AWS semantics on the differences but for us they're identical ;-)
return self.terminatejob()
def canceljob(self,):
job_id = self._get_param("jobId")
reason = self._get_param("reason")
self.batch_backend.cancel_job(job_id, reason)
return ""

View File

@ -125,6 +125,93 @@ def test_create_job_queue_without_compute_environment():
err["Message"].should.equal("At least 1 compute environment must be provided")
@mock_ec2
@mock_ecs
@mock_iam
@mock_batch
def test_describe_job_queue_unknown_value():
ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients()
resp = batch_client.describe_job_queues(jobQueues=["test_invalid_queue"])
resp.should.have.key("jobQueues").being.length_of(0)
@mock_ec2
@mock_ecs
@mock_iam
@mock_batch
def test_create_job_queue_twice():
ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients()
vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client)
compute_name = "test_compute_env"
resp = batch_client.create_compute_environment(
computeEnvironmentName=compute_name,
type="UNMANAGED",
state="ENABLED",
serviceRole=iam_arn,
)
compute_env_arn = resp["computeEnvironmentArn"]
batch_client.create_job_queue(
jobQueueName="test_job_queue",
state="ENABLED",
priority=123,
computeEnvironmentOrder=[{"order": 123, "computeEnvironment": compute_env_arn}],
)
with pytest.raises(ClientError) as ex:
batch_client.create_job_queue(
jobQueueName="test_job_queue",
state="ENABLED",
priority=123,
computeEnvironmentOrder=[
{"order": 123, "computeEnvironment": compute_env_arn}
],
)
err = ex.value.response["Error"]
err["Code"].should.equal("ClientException")
err["Message"].should.equal("Job queue test_job_queue already exists")
@mock_ec2
@mock_ecs
@mock_iam
@mock_batch
def test_create_job_queue_incorrect_state():
ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients()
with pytest.raises(ClientError) as ex:
batch_client.create_job_queue(
jobQueueName="test_job_queue2",
state="JUNK",
priority=123,
computeEnvironmentOrder=[],
)
err = ex.value.response["Error"]
err["Code"].should.equal("ClientException")
err["Message"].should.equal("state JUNK must be one of ENABLED | DISABLED")
@mock_ec2
@mock_ecs
@mock_iam
@mock_batch
def test_create_job_queue_without_compute_environment():
ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients()
with pytest.raises(ClientError) as ex:
batch_client.create_job_queue(
jobQueueName="test_job_queue3",
state="ENABLED",
priority=123,
computeEnvironmentOrder=[],
)
err = ex.value.response["Error"]
err["Code"].should.equal("ClientException")
err["Message"].should.equal("At least 1 compute environment must be provided")
@mock_ec2
@mock_ecs
@mock_iam

View File

@ -95,34 +95,9 @@ def test_submit_job():
ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients()
vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client)
compute_name = "test_compute_env"
resp = batch_client.create_compute_environment(
computeEnvironmentName=compute_name,
type="UNMANAGED",
state="ENABLED",
serviceRole=iam_arn,
)
arn = resp["computeEnvironmentArn"]
resp = batch_client.create_job_queue(
jobQueueName="test_job_queue",
state="ENABLED",
priority=123,
computeEnvironmentOrder=[{"order": 123, "computeEnvironment": arn}],
)
queue_arn = resp["jobQueueArn"]
resp = batch_client.register_job_definition(
jobDefinitionName="sayhellotomylittlefriend",
type="container",
containerProperties={
"image": "busybox:latest",
"vcpus": 1,
"memory": 128,
"command": ["echo", "hello"],
},
)
job_def_arn = resp["jobDefinitionArn"]
job_def_name = "sayhellotomylittlefriend"
commands = ["echo", "hello"]
job_def_arn, queue_arn = prepare_job(batch_client, commands, iam_arn, job_def_name)
resp = batch_client.submit_job(
jobName="test1", jobQueue=queue_arn, jobDefinition=job_def_arn
@ -132,7 +107,7 @@ def test_submit_job():
_wait_for_job_status(batch_client, job_id, "SUCCEEDED")
resp = logs_client.describe_log_streams(
logGroupName="/aws/batch/job", logStreamNamePrefix="sayhellotomylittlefriend"
logGroupName="/aws/batch/job", logStreamNamePrefix=job_def_name
)
resp["logStreams"].should.have.length_of(1)
ls_name = resp["logStreams"][0]["logStreamName"]
@ -153,34 +128,9 @@ def test_list_jobs():
ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients()
vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client)
compute_name = "test_compute_env"
resp = batch_client.create_compute_environment(
computeEnvironmentName=compute_name,
type="UNMANAGED",
state="ENABLED",
serviceRole=iam_arn,
)
arn = resp["computeEnvironmentArn"]
resp = batch_client.create_job_queue(
jobQueueName="test_job_queue",
state="ENABLED",
priority=123,
computeEnvironmentOrder=[{"order": 123, "computeEnvironment": arn}],
)
queue_arn = resp["jobQueueArn"]
resp = batch_client.register_job_definition(
jobDefinitionName="sleep5",
type="container",
containerProperties={
"image": "busybox:latest",
"vcpus": 1,
"memory": 128,
"command": ["sleep", "5"],
},
)
job_def_arn = resp["jobDefinitionArn"]
job_def_name = "sleep5"
commands = ["sleep", "5"]
job_def_arn, queue_arn = prepare_job(batch_client, commands, iam_arn, job_def_name)
resp = batch_client.submit_job(
jobName="test1", jobQueue=queue_arn, jobDefinition=job_def_arn
@ -213,34 +163,9 @@ def test_terminate_job():
ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients()
vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client)
compute_name = "test_compute_env"
resp = batch_client.create_compute_environment(
computeEnvironmentName=compute_name,
type="UNMANAGED",
state="ENABLED",
serviceRole=iam_arn,
)
arn = resp["computeEnvironmentArn"]
resp = batch_client.create_job_queue(
jobQueueName="test_job_queue",
state="ENABLED",
priority=123,
computeEnvironmentOrder=[{"order": 123, "computeEnvironment": arn}],
)
queue_arn = resp["jobQueueArn"]
resp = batch_client.register_job_definition(
jobDefinitionName="echo-sleep-echo",
type="container",
containerProperties={
"image": "busybox:latest",
"vcpus": 1,
"memory": 128,
"command": ["sh", "-c", "echo start && sleep 30 && echo stop"],
},
)
job_def_arn = resp["jobDefinitionArn"]
job_def_name = "echo-sleep-echo"
commands = ["sh", "-c", "echo start && sleep 30 && echo stop"]
job_def_arn, queue_arn = prepare_job(batch_client, commands, iam_arn, job_def_name)
resp = batch_client.submit_job(
jobName="test1", jobQueue=queue_arn, jobDefinition=job_def_arn
@ -259,7 +184,7 @@ def test_terminate_job():
resp["jobs"][0]["statusReason"].should.equal("test_terminate")
resp = logs_client.describe_log_streams(
logGroupName="/aws/batch/job", logStreamNamePrefix="echo-sleep-echo"
logGroupName="/aws/batch/job", logStreamNamePrefix=job_def_name
)
resp["logStreams"].should.have.length_of(1)
ls_name = resp["logStreams"][0]["logStreamName"]
@ -273,6 +198,74 @@ def test_terminate_job():
resp["events"][0]["message"].should.equal("start")
@mock_logs
@mock_ec2
@mock_ecs
@mock_iam
@mock_batch
def test_cancel_pending_job():
ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients()
vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client)
# We need to be able to cancel a job that has not been started yet
# Locally, our jobs start so fast that we can't cancel them in time
# So delay our job, by letting it depend on a slow-running job
commands = ["sleep", "1"]
job_def_arn, queue_arn = prepare_job(batch_client, commands, iam_arn, "deptest")
resp = batch_client.submit_job(
jobName="test1", jobQueue=queue_arn, jobDefinition=job_def_arn
)
delayed_job = resp["jobId"]
depends_on = [{"jobId": delayed_job, "type": "SEQUENTIAL"}]
resp = batch_client.submit_job(
jobName="test_job_name",
jobQueue=queue_arn,
jobDefinition=job_def_arn,
dependsOn=depends_on,
)
job_id = resp["jobId"]
batch_client.cancel_job(jobId=job_id, reason="test_cancel")
_wait_for_job_status(batch_client, job_id, "FAILED", seconds_to_wait=10)
resp = batch_client.describe_jobs(jobs=[job_id])
resp["jobs"][0]["jobName"].should.equal("test_job_name")
resp["jobs"][0]["statusReason"].should.equal("test_cancel")
@mock_logs
@mock_ec2
@mock_ecs
@mock_iam
@mock_batch
def test_cancel_running_job():
"""
Test verifies that the moment the job has started, we can't cancel anymore
"""
ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients()
vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client)
job_def_name = "echo-o-o"
commands = ["echo", "start"]
job_def_arn, queue_arn = prepare_job(batch_client, commands, iam_arn, job_def_name)
resp = batch_client.submit_job(
jobName="test_job_name", jobQueue=queue_arn, jobDefinition=job_def_arn
)
job_id = resp["jobId"]
_wait_for_job_status(batch_client, job_id, "STARTING")
batch_client.cancel_job(jobId=job_id, reason="test_cancel")
# We cancelled too late, the job was already running. Now we just wait for it to succeed
_wait_for_job_status(batch_client, job_id, "SUCCEEDED", seconds_to_wait=5)
resp = batch_client.describe_jobs(jobs=[job_id])
resp["jobs"][0]["jobName"].should.equal("test_job_name")
resp["jobs"][0].shouldnt.have.key("statusReason")
def _wait_for_job_status(client, job_id, status, seconds_to_wait=30):
wait_time = datetime.datetime.now() + datetime.timedelta(seconds=seconds_to_wait)
last_job_status = None
@ -298,34 +291,9 @@ def test_failed_job():
ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients()
vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client)
compute_name = "test_compute_env"
resp = batch_client.create_compute_environment(
computeEnvironmentName=compute_name,
type="UNMANAGED",
state="ENABLED",
serviceRole=iam_arn,
)
arn = resp["computeEnvironmentArn"]
resp = batch_client.create_job_queue(
jobQueueName="test_job_queue",
state="ENABLED",
priority=123,
computeEnvironmentOrder=[{"order": 123, "computeEnvironment": arn}],
)
queue_arn = resp["jobQueueArn"]
resp = batch_client.register_job_definition(
jobDefinitionName="sayhellotomylittlefriend",
type="container",
containerProperties={
"image": "busybox:latest",
"vcpus": 1,
"memory": 128,
"command": ["exit", "1"],
},
)
job_def_arn = resp["jobDefinitionArn"]
job_def_name = "exit-1"
commands = ["exit", "1"]
job_def_arn, queue_arn = prepare_job(batch_client, commands, iam_arn, job_def_name)
resp = batch_client.submit_job(
jobName="test1", jobQueue=queue_arn, jobDefinition=job_def_arn
@ -355,34 +323,12 @@ def test_dependencies():
ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients()
vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client)
compute_name = "test_compute_env"
resp = batch_client.create_compute_environment(
computeEnvironmentName=compute_name,
type="UNMANAGED",
state="ENABLED",
serviceRole=iam_arn,
job_def_arn, queue_arn = prepare_job(
batch_client,
commands=["echo", "hello"],
iam_arn=iam_arn,
job_def_name="dependencytest",
)
arn = resp["computeEnvironmentArn"]
resp = batch_client.create_job_queue(
jobQueueName="test_job_queue",
state="ENABLED",
priority=123,
computeEnvironmentOrder=[{"order": 123, "computeEnvironment": arn}],
)
queue_arn = resp["jobQueueArn"]
resp = batch_client.register_job_definition(
jobDefinitionName="sayhellotomylittlefriend",
type="container",
containerProperties={
"image": "busybox:latest",
"vcpus": 1,
"memory": 128,
"command": ["echo", "hello"],
},
)
job_def_arn = resp["jobDefinitionArn"]
resp = batch_client.submit_job(
jobName="test1", jobQueue=queue_arn, jobDefinition=job_def_arn
@ -651,3 +597,34 @@ def test_container_overrides():
sure.expect(env_var).to.contain({"name": "TEST2", "value": "from job"})
sure.expect(env_var).to.contain({"name": "AWS_BATCH_JOB_ID", "value": job_id})
def prepare_job(batch_client, commands, iam_arn, job_def_name):
compute_name = "test_compute_env"
resp = batch_client.create_compute_environment(
computeEnvironmentName=compute_name,
type="UNMANAGED",
state="ENABLED",
serviceRole=iam_arn,
)
arn = resp["computeEnvironmentArn"]
resp = batch_client.create_job_queue(
jobQueueName="test_job_queue",
state="ENABLED",
priority=123,
computeEnvironmentOrder=[{"order": 123, "computeEnvironment": arn}],
)
queue_arn = resp["jobQueueArn"]
resp = batch_client.register_job_definition(
jobDefinitionName=job_def_name,
type="container",
containerProperties={
"image": "busybox:latest",
"vcpus": 1,
"memory": 128,
"command": commands,
},
)
job_def_arn = resp["jobDefinitionArn"]
return job_def_arn, queue_arn