diff --git a/moto/batch/models.py b/moto/batch/models.py index be8fca9d1..7f75225f7 100644 --- a/moto/batch/models.py +++ b/moto/batch/models.py @@ -196,6 +196,7 @@ class Job(threading.Thread, BaseModel): self.job_started_at = datetime.datetime(1970, 1, 1) self.job_stopped_at = datetime.datetime(1970, 1, 1) self.job_stopped = False + self.job_stopped_reason = None self.stop = False @@ -230,6 +231,8 @@ class Job(threading.Thread, BaseModel): } if self.job_stopped: result['stoppedAt'] = datetime2int(self.job_stopped_at) + if self.job_stopped_reason is not None: + result['statusReason'] = self.job_stopped_reason return result def run(self): @@ -328,6 +331,11 @@ class Job(threading.Thread, BaseModel): self.job_stopped = True self.job_stopped_at = datetime.datetime.now() + def terminate(self, reason): + if not self.stop: + self.stop = True + self.job_stopped_reason = reason + class BatchBackend(BaseBackend): def __init__(self, region_name=None): @@ -478,6 +486,20 @@ class BatchBackend(BaseBackend): return result + def get_job_by_id(self, identifier): + """ + Get job by id + :param identifier: Job ID + :type identifier: str + + :return: Job + :rtype: Job + """ + try: + return self._jobs[identifier] + except KeyError: + return None + def describe_compute_environments(self, environments=None, max_results=None, next_token=None): envs = set() if environments is not None: @@ -916,6 +938,36 @@ class BatchBackend(BaseBackend): return result + def list_jobs(self, job_queue, job_status=None, max_results=None, next_token=None): + jobs = [] + + job_queue = self.get_job_queue(job_queue) + if job_queue is None: + raise ClientException('Job queue {0} does not exist'.format(job_queue)) + + if job_status is not None and job_status not in ('SUBMITTED', 'PENDING', 'RUNNABLE', 'STARTING', 'RUNNING', 'SUCCEEDED', 'FAILED'): + raise ClientException('Job status is not one of SUBMITTED | PENDING | RUNNABLE | STARTING | RUNNING | SUCCEEDED | FAILED') + + for job in job_queue.jobs: + if job_status is not None and job.job_state != job_status: + continue + + jobs.append(job) + + return jobs + + def terminate_job(self, job_id, reason): + if job_id is None: + raise ClientException('Job ID does not exist') + if reason is None: + raise ClientException('Reason does not exist') + + job = self.get_job_by_id(job_id) + if job is None: + raise ClientException('Job not found') + + job.terminate(reason) + available_regions = boto3.session.Session().get_available_regions("batch") batch_backends = {region: BatchBackend(region_name=region) for region in available_regions} diff --git a/moto/batch/responses.py b/moto/batch/responses.py index 2bec7ddf1..96094068d 100644 --- a/moto/batch/responses.py +++ b/moto/batch/responses.py @@ -263,3 +263,42 @@ class BatchResponse(BaseResponse): return json.dumps({'jobs': self.batch_backend.describe_jobs(jobs)}) except AWSError as err: return err.response() + + # ListJobs + def listjobs(self): + job_queue = self._get_param('jobQueue') + job_status = self._get_param('jobStatus') + max_results = self._get_param('maxResults') + next_token = self._get_param('nextToken') + + try: + jobs = self.batch_backend.list_jobs(job_queue, job_status, max_results, next_token) + except AWSError as err: + return err.response() + + result = {'jobSummaryList': [{'jobId': job.job_id, 'jobName': job.job_name} for job in jobs]} + return json.dumps(result) + + # TerminateJob + def terminatejob(self): + job_id = self._get_param('jobId') + reason = self._get_param('reason') + + try: + self.batch_backend.terminate_job(job_id, reason) + except AWSError as err: + return err.response() + + return '' + + # CancelJob + def canceljob(self): # Theres some AWS semantics on the differences but for us they're identical ;-) + job_id = self._get_param('jobId') + reason = self._get_param('reason') + + try: + self.batch_backend.terminate_job(job_id, reason) + except AWSError as err: + return err.response() + + return '' diff --git a/moto/batch/urls.py b/moto/batch/urls.py index 924e55e6d..c64086ef2 100644 --- a/moto/batch/urls.py +++ b/moto/batch/urls.py @@ -18,5 +18,8 @@ url_paths = { '{0}/v1/deregisterjobdefinition': BatchResponse.dispatch, '{0}/v1/describejobdefinitions': BatchResponse.dispatch, '{0}/v1/submitjob': BatchResponse.dispatch, - '{0}/v1/describejobs': BatchResponse.dispatch + '{0}/v1/describejobs': BatchResponse.dispatch, + '{0}/v1/listjobs': BatchResponse.dispatch, + '{0}/v1/terminatejob': BatchResponse.dispatch, + '{0}/v1/canceljob': BatchResponse.dispatch, } diff --git a/tests/test_batch/test_batch.py b/tests/test_batch/test_batch.py index acbe75e94..ec24cd911 100644 --- a/tests/test_batch/test_batch.py +++ b/tests/test_batch/test_batch.py @@ -583,7 +583,7 @@ def test_describe_task_definition(): len(resp['jobDefinitions']).should.equal(3) -# SLOW TEST +# SLOW TESTS @expected_failure @mock_logs @mock_ec2 @@ -654,4 +654,156 @@ def test_submit_job(): ls_name = resp['logStreams'][0]['logStreamName'] resp = logs_client.get_log_events(logGroupName='/aws/batch/job', logStreamName=ls_name) - len(resp['events']).should.be.greater_than(5) \ No newline at end of file + len(resp['events']).should.be.greater_than(5) + + +@expected_failure +@mock_logs +@mock_ec2 +@mock_ecs +@mock_iam +@mock_batch +def test_list_jobs(): + ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients() + vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) + + compute_name = 'test_compute_env' + resp = batch_client.create_compute_environment( + computeEnvironmentName=compute_name, + type='UNMANAGED', + state='ENABLED', + serviceRole=iam_arn + ) + arn = resp['computeEnvironmentArn'] + + resp = batch_client.create_job_queue( + jobQueueName='test_job_queue', + state='ENABLED', + priority=123, + computeEnvironmentOrder=[ + { + 'order': 123, + 'computeEnvironment': arn + }, + ] + ) + queue_arn = resp['jobQueueArn'] + + resp = batch_client.register_job_definition( + jobDefinitionName='sleep10', + type='container', + containerProperties={ + 'image': 'busybox', + 'vcpus': 1, + 'memory': 128, + 'command': ['sleep', '10'] + } + ) + job_def_arn = resp['jobDefinitionArn'] + + resp = batch_client.submit_job( + jobName='test1', + jobQueue=queue_arn, + jobDefinition=job_def_arn + ) + job_id1 = resp['jobId'] + resp = batch_client.submit_job( + jobName='test2', + jobQueue=queue_arn, + jobDefinition=job_def_arn + ) + job_id2 = resp['jobId'] + + future = datetime.datetime.now() + datetime.timedelta(seconds=30) + + resp_finished_jobs = batch_client.list_jobs( + jobQueue=queue_arn, + jobStatus='SUCCEEDED' + ) + + # Wait only as long as it takes to run the jobs + while datetime.datetime.now() < future: + resp = batch_client.describe_jobs(jobs=[job_id1, job_id2]) + + any_failed_jobs = any([job['status'] == 'FAILED' for job in resp['jobs']]) + succeeded_jobs = all([job['status'] == 'SUCCEEDED' for job in resp['jobs']]) + + if any_failed_jobs: + raise RuntimeError('A Batch job failed') + if succeeded_jobs: + break + time.sleep(0.5) + else: + raise RuntimeError('Batch jobs timed out') + + resp_finished_jobs2 = batch_client.list_jobs( + jobQueue=queue_arn, + jobStatus='SUCCEEDED' + ) + + len(resp_finished_jobs['jobSummaryList']).should.equal(0) + len(resp_finished_jobs2['jobSummaryList']).should.equal(2) + + +@expected_failure +@mock_logs +@mock_ec2 +@mock_ecs +@mock_iam +@mock_batch +def test_terminate_job(): + ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients() + vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) + + compute_name = 'test_compute_env' + resp = batch_client.create_compute_environment( + computeEnvironmentName=compute_name, + type='UNMANAGED', + state='ENABLED', + serviceRole=iam_arn + ) + arn = resp['computeEnvironmentArn'] + + resp = batch_client.create_job_queue( + jobQueueName='test_job_queue', + state='ENABLED', + priority=123, + computeEnvironmentOrder=[ + { + 'order': 123, + 'computeEnvironment': arn + }, + ] + ) + queue_arn = resp['jobQueueArn'] + + resp = batch_client.register_job_definition( + jobDefinitionName='sleep10', + type='container', + containerProperties={ + 'image': 'busybox', + 'vcpus': 1, + 'memory': 128, + 'command': ['sleep', '10'] + } + ) + job_def_arn = resp['jobDefinitionArn'] + + resp = batch_client.submit_job( + jobName='test1', + jobQueue=queue_arn, + jobDefinition=job_def_arn + ) + job_id = resp['jobId'] + + time.sleep(2) + + batch_client.terminate_job(jobId=job_id, reason='test_terminate') + + time.sleep(1) + + resp = batch_client.describe_jobs(jobs=[job_id]) + resp['jobs'][0]['jobName'].should.equal('test1') + resp['jobs'][0]['status'].should.equal('FAILED') + resp['jobs'][0]['statusReason'].should.equal('test_terminate') +