diff --git a/IMPLEMENTATION_COVERAGE.md b/IMPLEMENTATION_COVERAGE.md index 665e4ac96..1b8a47b0f 100644 --- a/IMPLEMENTATION_COVERAGE.md +++ b/IMPLEMENTATION_COVERAGE.md @@ -1029,7 +1029,7 @@
78% implemented -- [ ] cancel_job +- [X] cancel_job - [X] create_compute_environment - [X] create_job_queue - [X] delete_compute_environment diff --git a/moto/batch/models.py b/moto/batch/models.py index a77015c3d..783c55fc2 100644 --- a/moto/batch/models.py +++ b/moto/batch/models.py @@ -460,11 +460,12 @@ class Job(threading.Thread, BaseModel, DockerModel): # TODO setup ecs container instance self.job_started_at = datetime.datetime.now() - self.job_state = "STARTING" + log_config = docker.types.LogConfig(type=docker.types.LogConfig.types.JSON) image_repository, image_tag = parse_image_ref(image) # avoid explicit pulling here, to allow using cached images # self.docker_client.images.pull(image_repository, image_tag) + self.job_state = "STARTING" container = self.docker_client.containers.run( image, cmd, @@ -1357,6 +1358,12 @@ class BatchBackend(BaseBackend): return jobs + def cancel_job(self, job_id, reason): + job = self.get_job_by_id(job_id) + if job.job_state in ["SUBMITTED", "PENDING", "RUNNABLE"]: + job.terminate(reason) + # No-Op for jobs that have already started - user has to explicitly terminate those + def terminate_job(self, job_id, reason): if job_id is None: raise ClientException("Job ID does not exist") diff --git a/moto/batch/responses.py b/moto/batch/responses.py index 200ace8b8..25afbc365 100644 --- a/moto/batch/responses.py +++ b/moto/batch/responses.py @@ -292,7 +292,9 @@ class BatchResponse(BaseResponse): return "" # CancelJob - def canceljob( - self, - ): # Theres some AWS semantics on the differences but for us they're identical ;-) - return self.terminatejob() + def canceljob(self,): + job_id = self._get_param("jobId") + reason = self._get_param("reason") + self.batch_backend.cancel_job(job_id, reason) + + return "" diff --git a/tests/test_batch/test_batch_job_queue.py b/tests/test_batch/test_batch_job_queue.py index 41612baf8..db9c4036c 100644 --- a/tests/test_batch/test_batch_job_queue.py +++ b/tests/test_batch/test_batch_job_queue.py @@ -125,6 +125,93 @@ def test_create_job_queue_without_compute_environment(): err["Message"].should.equal("At least 1 compute environment must be provided") +@mock_ec2 +@mock_ecs +@mock_iam +@mock_batch +def test_describe_job_queue_unknown_value(): + ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients() + + resp = batch_client.describe_job_queues(jobQueues=["test_invalid_queue"]) + resp.should.have.key("jobQueues").being.length_of(0) + + +@mock_ec2 +@mock_ecs +@mock_iam +@mock_batch +def test_create_job_queue_twice(): + ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients() + vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) + + compute_name = "test_compute_env" + resp = batch_client.create_compute_environment( + computeEnvironmentName=compute_name, + type="UNMANAGED", + state="ENABLED", + serviceRole=iam_arn, + ) + compute_env_arn = resp["computeEnvironmentArn"] + + batch_client.create_job_queue( + jobQueueName="test_job_queue", + state="ENABLED", + priority=123, + computeEnvironmentOrder=[{"order": 123, "computeEnvironment": compute_env_arn}], + ) + with pytest.raises(ClientError) as ex: + batch_client.create_job_queue( + jobQueueName="test_job_queue", + state="ENABLED", + priority=123, + computeEnvironmentOrder=[ + {"order": 123, "computeEnvironment": compute_env_arn} + ], + ) + + err = ex.value.response["Error"] + err["Code"].should.equal("ClientException") + err["Message"].should.equal("Job queue test_job_queue already exists") + + +@mock_ec2 +@mock_ecs +@mock_iam +@mock_batch +def test_create_job_queue_incorrect_state(): + ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients() + + with pytest.raises(ClientError) as ex: + batch_client.create_job_queue( + jobQueueName="test_job_queue2", + state="JUNK", + priority=123, + computeEnvironmentOrder=[], + ) + err = ex.value.response["Error"] + err["Code"].should.equal("ClientException") + err["Message"].should.equal("state JUNK must be one of ENABLED | DISABLED") + + +@mock_ec2 +@mock_ecs +@mock_iam +@mock_batch +def test_create_job_queue_without_compute_environment(): + ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients() + + with pytest.raises(ClientError) as ex: + batch_client.create_job_queue( + jobQueueName="test_job_queue3", + state="ENABLED", + priority=123, + computeEnvironmentOrder=[], + ) + err = ex.value.response["Error"] + err["Code"].should.equal("ClientException") + err["Message"].should.equal("At least 1 compute environment must be provided") + + @mock_ec2 @mock_ecs @mock_iam diff --git a/tests/test_batch/test_batch_jobs.py b/tests/test_batch/test_batch_jobs.py index 09b3dd329..6ef5bf519 100644 --- a/tests/test_batch/test_batch_jobs.py +++ b/tests/test_batch/test_batch_jobs.py @@ -95,34 +95,9 @@ def test_submit_job(): ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients() vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) - compute_name = "test_compute_env" - resp = batch_client.create_compute_environment( - computeEnvironmentName=compute_name, - type="UNMANAGED", - state="ENABLED", - serviceRole=iam_arn, - ) - arn = resp["computeEnvironmentArn"] - - resp = batch_client.create_job_queue( - jobQueueName="test_job_queue", - state="ENABLED", - priority=123, - computeEnvironmentOrder=[{"order": 123, "computeEnvironment": arn}], - ) - queue_arn = resp["jobQueueArn"] - - resp = batch_client.register_job_definition( - jobDefinitionName="sayhellotomylittlefriend", - type="container", - containerProperties={ - "image": "busybox:latest", - "vcpus": 1, - "memory": 128, - "command": ["echo", "hello"], - }, - ) - job_def_arn = resp["jobDefinitionArn"] + job_def_name = "sayhellotomylittlefriend" + commands = ["echo", "hello"] + job_def_arn, queue_arn = prepare_job(batch_client, commands, iam_arn, job_def_name) resp = batch_client.submit_job( jobName="test1", jobQueue=queue_arn, jobDefinition=job_def_arn @@ -132,7 +107,7 @@ def test_submit_job(): _wait_for_job_status(batch_client, job_id, "SUCCEEDED") resp = logs_client.describe_log_streams( - logGroupName="/aws/batch/job", logStreamNamePrefix="sayhellotomylittlefriend" + logGroupName="/aws/batch/job", logStreamNamePrefix=job_def_name ) resp["logStreams"].should.have.length_of(1) ls_name = resp["logStreams"][0]["logStreamName"] @@ -153,34 +128,9 @@ def test_list_jobs(): ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients() vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) - compute_name = "test_compute_env" - resp = batch_client.create_compute_environment( - computeEnvironmentName=compute_name, - type="UNMANAGED", - state="ENABLED", - serviceRole=iam_arn, - ) - arn = resp["computeEnvironmentArn"] - - resp = batch_client.create_job_queue( - jobQueueName="test_job_queue", - state="ENABLED", - priority=123, - computeEnvironmentOrder=[{"order": 123, "computeEnvironment": arn}], - ) - queue_arn = resp["jobQueueArn"] - - resp = batch_client.register_job_definition( - jobDefinitionName="sleep5", - type="container", - containerProperties={ - "image": "busybox:latest", - "vcpus": 1, - "memory": 128, - "command": ["sleep", "5"], - }, - ) - job_def_arn = resp["jobDefinitionArn"] + job_def_name = "sleep5" + commands = ["sleep", "5"] + job_def_arn, queue_arn = prepare_job(batch_client, commands, iam_arn, job_def_name) resp = batch_client.submit_job( jobName="test1", jobQueue=queue_arn, jobDefinition=job_def_arn @@ -213,34 +163,9 @@ def test_terminate_job(): ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients() vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) - compute_name = "test_compute_env" - resp = batch_client.create_compute_environment( - computeEnvironmentName=compute_name, - type="UNMANAGED", - state="ENABLED", - serviceRole=iam_arn, - ) - arn = resp["computeEnvironmentArn"] - - resp = batch_client.create_job_queue( - jobQueueName="test_job_queue", - state="ENABLED", - priority=123, - computeEnvironmentOrder=[{"order": 123, "computeEnvironment": arn}], - ) - queue_arn = resp["jobQueueArn"] - - resp = batch_client.register_job_definition( - jobDefinitionName="echo-sleep-echo", - type="container", - containerProperties={ - "image": "busybox:latest", - "vcpus": 1, - "memory": 128, - "command": ["sh", "-c", "echo start && sleep 30 && echo stop"], - }, - ) - job_def_arn = resp["jobDefinitionArn"] + job_def_name = "echo-sleep-echo" + commands = ["sh", "-c", "echo start && sleep 30 && echo stop"] + job_def_arn, queue_arn = prepare_job(batch_client, commands, iam_arn, job_def_name) resp = batch_client.submit_job( jobName="test1", jobQueue=queue_arn, jobDefinition=job_def_arn @@ -259,7 +184,7 @@ def test_terminate_job(): resp["jobs"][0]["statusReason"].should.equal("test_terminate") resp = logs_client.describe_log_streams( - logGroupName="/aws/batch/job", logStreamNamePrefix="echo-sleep-echo" + logGroupName="/aws/batch/job", logStreamNamePrefix=job_def_name ) resp["logStreams"].should.have.length_of(1) ls_name = resp["logStreams"][0]["logStreamName"] @@ -273,6 +198,74 @@ def test_terminate_job(): resp["events"][0]["message"].should.equal("start") +@mock_logs +@mock_ec2 +@mock_ecs +@mock_iam +@mock_batch +def test_cancel_pending_job(): + ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients() + vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) + + # We need to be able to cancel a job that has not been started yet + # Locally, our jobs start so fast that we can't cancel them in time + # So delay our job, by letting it depend on a slow-running job + commands = ["sleep", "1"] + job_def_arn, queue_arn = prepare_job(batch_client, commands, iam_arn, "deptest") + + resp = batch_client.submit_job( + jobName="test1", jobQueue=queue_arn, jobDefinition=job_def_arn + ) + delayed_job = resp["jobId"] + + depends_on = [{"jobId": delayed_job, "type": "SEQUENTIAL"}] + resp = batch_client.submit_job( + jobName="test_job_name", + jobQueue=queue_arn, + jobDefinition=job_def_arn, + dependsOn=depends_on, + ) + job_id = resp["jobId"] + + batch_client.cancel_job(jobId=job_id, reason="test_cancel") + _wait_for_job_status(batch_client, job_id, "FAILED", seconds_to_wait=10) + + resp = batch_client.describe_jobs(jobs=[job_id]) + resp["jobs"][0]["jobName"].should.equal("test_job_name") + resp["jobs"][0]["statusReason"].should.equal("test_cancel") + + +@mock_logs +@mock_ec2 +@mock_ecs +@mock_iam +@mock_batch +def test_cancel_running_job(): + """ + Test verifies that the moment the job has started, we can't cancel anymore + """ + ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients() + vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) + + job_def_name = "echo-o-o" + commands = ["echo", "start"] + job_def_arn, queue_arn = prepare_job(batch_client, commands, iam_arn, job_def_name) + + resp = batch_client.submit_job( + jobName="test_job_name", jobQueue=queue_arn, jobDefinition=job_def_arn + ) + job_id = resp["jobId"] + _wait_for_job_status(batch_client, job_id, "STARTING") + + batch_client.cancel_job(jobId=job_id, reason="test_cancel") + # We cancelled too late, the job was already running. Now we just wait for it to succeed + _wait_for_job_status(batch_client, job_id, "SUCCEEDED", seconds_to_wait=5) + + resp = batch_client.describe_jobs(jobs=[job_id]) + resp["jobs"][0]["jobName"].should.equal("test_job_name") + resp["jobs"][0].shouldnt.have.key("statusReason") + + def _wait_for_job_status(client, job_id, status, seconds_to_wait=30): wait_time = datetime.datetime.now() + datetime.timedelta(seconds=seconds_to_wait) last_job_status = None @@ -298,34 +291,9 @@ def test_failed_job(): ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients() vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) - compute_name = "test_compute_env" - resp = batch_client.create_compute_environment( - computeEnvironmentName=compute_name, - type="UNMANAGED", - state="ENABLED", - serviceRole=iam_arn, - ) - arn = resp["computeEnvironmentArn"] - - resp = batch_client.create_job_queue( - jobQueueName="test_job_queue", - state="ENABLED", - priority=123, - computeEnvironmentOrder=[{"order": 123, "computeEnvironment": arn}], - ) - queue_arn = resp["jobQueueArn"] - - resp = batch_client.register_job_definition( - jobDefinitionName="sayhellotomylittlefriend", - type="container", - containerProperties={ - "image": "busybox:latest", - "vcpus": 1, - "memory": 128, - "command": ["exit", "1"], - }, - ) - job_def_arn = resp["jobDefinitionArn"] + job_def_name = "exit-1" + commands = ["exit", "1"] + job_def_arn, queue_arn = prepare_job(batch_client, commands, iam_arn, job_def_name) resp = batch_client.submit_job( jobName="test1", jobQueue=queue_arn, jobDefinition=job_def_arn @@ -355,34 +323,12 @@ def test_dependencies(): ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients() vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) - compute_name = "test_compute_env" - resp = batch_client.create_compute_environment( - computeEnvironmentName=compute_name, - type="UNMANAGED", - state="ENABLED", - serviceRole=iam_arn, + job_def_arn, queue_arn = prepare_job( + batch_client, + commands=["echo", "hello"], + iam_arn=iam_arn, + job_def_name="dependencytest", ) - arn = resp["computeEnvironmentArn"] - - resp = batch_client.create_job_queue( - jobQueueName="test_job_queue", - state="ENABLED", - priority=123, - computeEnvironmentOrder=[{"order": 123, "computeEnvironment": arn}], - ) - queue_arn = resp["jobQueueArn"] - - resp = batch_client.register_job_definition( - jobDefinitionName="sayhellotomylittlefriend", - type="container", - containerProperties={ - "image": "busybox:latest", - "vcpus": 1, - "memory": 128, - "command": ["echo", "hello"], - }, - ) - job_def_arn = resp["jobDefinitionArn"] resp = batch_client.submit_job( jobName="test1", jobQueue=queue_arn, jobDefinition=job_def_arn @@ -651,3 +597,34 @@ def test_container_overrides(): sure.expect(env_var).to.contain({"name": "TEST2", "value": "from job"}) sure.expect(env_var).to.contain({"name": "AWS_BATCH_JOB_ID", "value": job_id}) + + +def prepare_job(batch_client, commands, iam_arn, job_def_name): + compute_name = "test_compute_env" + resp = batch_client.create_compute_environment( + computeEnvironmentName=compute_name, + type="UNMANAGED", + state="ENABLED", + serviceRole=iam_arn, + ) + arn = resp["computeEnvironmentArn"] + + resp = batch_client.create_job_queue( + jobQueueName="test_job_queue", + state="ENABLED", + priority=123, + computeEnvironmentOrder=[{"order": 123, "computeEnvironment": arn}], + ) + queue_arn = resp["jobQueueArn"] + resp = batch_client.register_job_definition( + jobDefinitionName=job_def_name, + type="container", + containerProperties={ + "image": "busybox:latest", + "vcpus": 1, + "memory": 128, + "command": commands, + }, + ) + job_def_arn = resp["jobDefinitionArn"] + return job_def_arn, queue_arn