Implemented Terminate, Cancel and List jobs
This commit is contained in:
parent
ddd52a5a97
commit
e3024ae1ba
@ -196,6 +196,7 @@ class Job(threading.Thread, BaseModel):
|
|||||||
self.job_started_at = datetime.datetime(1970, 1, 1)
|
self.job_started_at = datetime.datetime(1970, 1, 1)
|
||||||
self.job_stopped_at = datetime.datetime(1970, 1, 1)
|
self.job_stopped_at = datetime.datetime(1970, 1, 1)
|
||||||
self.job_stopped = False
|
self.job_stopped = False
|
||||||
|
self.job_stopped_reason = None
|
||||||
|
|
||||||
self.stop = False
|
self.stop = False
|
||||||
|
|
||||||
@ -230,6 +231,8 @@ class Job(threading.Thread, BaseModel):
|
|||||||
}
|
}
|
||||||
if self.job_stopped:
|
if self.job_stopped:
|
||||||
result['stoppedAt'] = datetime2int(self.job_stopped_at)
|
result['stoppedAt'] = datetime2int(self.job_stopped_at)
|
||||||
|
if self.job_stopped_reason is not None:
|
||||||
|
result['statusReason'] = self.job_stopped_reason
|
||||||
return result
|
return result
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
@ -328,6 +331,11 @@ class Job(threading.Thread, BaseModel):
|
|||||||
self.job_stopped = True
|
self.job_stopped = True
|
||||||
self.job_stopped_at = datetime.datetime.now()
|
self.job_stopped_at = datetime.datetime.now()
|
||||||
|
|
||||||
|
def terminate(self, reason):
|
||||||
|
if not self.stop:
|
||||||
|
self.stop = True
|
||||||
|
self.job_stopped_reason = reason
|
||||||
|
|
||||||
|
|
||||||
class BatchBackend(BaseBackend):
|
class BatchBackend(BaseBackend):
|
||||||
def __init__(self, region_name=None):
|
def __init__(self, region_name=None):
|
||||||
@ -478,6 +486,20 @@ class BatchBackend(BaseBackend):
|
|||||||
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
def get_job_by_id(self, identifier):
|
||||||
|
"""
|
||||||
|
Get job by id
|
||||||
|
:param identifier: Job ID
|
||||||
|
:type identifier: str
|
||||||
|
|
||||||
|
:return: Job
|
||||||
|
:rtype: Job
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
return self._jobs[identifier]
|
||||||
|
except KeyError:
|
||||||
|
return None
|
||||||
|
|
||||||
def describe_compute_environments(self, environments=None, max_results=None, next_token=None):
|
def describe_compute_environments(self, environments=None, max_results=None, next_token=None):
|
||||||
envs = set()
|
envs = set()
|
||||||
if environments is not None:
|
if environments is not None:
|
||||||
@ -916,6 +938,36 @@ class BatchBackend(BaseBackend):
|
|||||||
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
def list_jobs(self, job_queue, job_status=None, max_results=None, next_token=None):
|
||||||
|
jobs = []
|
||||||
|
|
||||||
|
job_queue = self.get_job_queue(job_queue)
|
||||||
|
if job_queue is None:
|
||||||
|
raise ClientException('Job queue {0} does not exist'.format(job_queue))
|
||||||
|
|
||||||
|
if job_status is not None and job_status not in ('SUBMITTED', 'PENDING', 'RUNNABLE', 'STARTING', 'RUNNING', 'SUCCEEDED', 'FAILED'):
|
||||||
|
raise ClientException('Job status is not one of SUBMITTED | PENDING | RUNNABLE | STARTING | RUNNING | SUCCEEDED | FAILED')
|
||||||
|
|
||||||
|
for job in job_queue.jobs:
|
||||||
|
if job_status is not None and job.job_state != job_status:
|
||||||
|
continue
|
||||||
|
|
||||||
|
jobs.append(job)
|
||||||
|
|
||||||
|
return jobs
|
||||||
|
|
||||||
|
def terminate_job(self, job_id, reason):
|
||||||
|
if job_id is None:
|
||||||
|
raise ClientException('Job ID does not exist')
|
||||||
|
if reason is None:
|
||||||
|
raise ClientException('Reason does not exist')
|
||||||
|
|
||||||
|
job = self.get_job_by_id(job_id)
|
||||||
|
if job is None:
|
||||||
|
raise ClientException('Job not found')
|
||||||
|
|
||||||
|
job.terminate(reason)
|
||||||
|
|
||||||
|
|
||||||
available_regions = boto3.session.Session().get_available_regions("batch")
|
available_regions = boto3.session.Session().get_available_regions("batch")
|
||||||
batch_backends = {region: BatchBackend(region_name=region) for region in available_regions}
|
batch_backends = {region: BatchBackend(region_name=region) for region in available_regions}
|
||||||
|
@ -263,3 +263,42 @@ class BatchResponse(BaseResponse):
|
|||||||
return json.dumps({'jobs': self.batch_backend.describe_jobs(jobs)})
|
return json.dumps({'jobs': self.batch_backend.describe_jobs(jobs)})
|
||||||
except AWSError as err:
|
except AWSError as err:
|
||||||
return err.response()
|
return err.response()
|
||||||
|
|
||||||
|
# ListJobs
|
||||||
|
def listjobs(self):
|
||||||
|
job_queue = self._get_param('jobQueue')
|
||||||
|
job_status = self._get_param('jobStatus')
|
||||||
|
max_results = self._get_param('maxResults')
|
||||||
|
next_token = self._get_param('nextToken')
|
||||||
|
|
||||||
|
try:
|
||||||
|
jobs = self.batch_backend.list_jobs(job_queue, job_status, max_results, next_token)
|
||||||
|
except AWSError as err:
|
||||||
|
return err.response()
|
||||||
|
|
||||||
|
result = {'jobSummaryList': [{'jobId': job.job_id, 'jobName': job.job_name} for job in jobs]}
|
||||||
|
return json.dumps(result)
|
||||||
|
|
||||||
|
# TerminateJob
|
||||||
|
def terminatejob(self):
|
||||||
|
job_id = self._get_param('jobId')
|
||||||
|
reason = self._get_param('reason')
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.batch_backend.terminate_job(job_id, reason)
|
||||||
|
except AWSError as err:
|
||||||
|
return err.response()
|
||||||
|
|
||||||
|
return ''
|
||||||
|
|
||||||
|
# CancelJob
|
||||||
|
def canceljob(self): # Theres some AWS semantics on the differences but for us they're identical ;-)
|
||||||
|
job_id = self._get_param('jobId')
|
||||||
|
reason = self._get_param('reason')
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.batch_backend.terminate_job(job_id, reason)
|
||||||
|
except AWSError as err:
|
||||||
|
return err.response()
|
||||||
|
|
||||||
|
return ''
|
||||||
|
@ -18,5 +18,8 @@ url_paths = {
|
|||||||
'{0}/v1/deregisterjobdefinition': BatchResponse.dispatch,
|
'{0}/v1/deregisterjobdefinition': BatchResponse.dispatch,
|
||||||
'{0}/v1/describejobdefinitions': BatchResponse.dispatch,
|
'{0}/v1/describejobdefinitions': BatchResponse.dispatch,
|
||||||
'{0}/v1/submitjob': BatchResponse.dispatch,
|
'{0}/v1/submitjob': BatchResponse.dispatch,
|
||||||
'{0}/v1/describejobs': BatchResponse.dispatch
|
'{0}/v1/describejobs': BatchResponse.dispatch,
|
||||||
|
'{0}/v1/listjobs': BatchResponse.dispatch,
|
||||||
|
'{0}/v1/terminatejob': BatchResponse.dispatch,
|
||||||
|
'{0}/v1/canceljob': BatchResponse.dispatch,
|
||||||
}
|
}
|
||||||
|
@ -583,7 +583,7 @@ def test_describe_task_definition():
|
|||||||
len(resp['jobDefinitions']).should.equal(3)
|
len(resp['jobDefinitions']).should.equal(3)
|
||||||
|
|
||||||
|
|
||||||
# SLOW TEST
|
# SLOW TESTS
|
||||||
@expected_failure
|
@expected_failure
|
||||||
@mock_logs
|
@mock_logs
|
||||||
@mock_ec2
|
@mock_ec2
|
||||||
@ -654,4 +654,156 @@ def test_submit_job():
|
|||||||
ls_name = resp['logStreams'][0]['logStreamName']
|
ls_name = resp['logStreams'][0]['logStreamName']
|
||||||
|
|
||||||
resp = logs_client.get_log_events(logGroupName='/aws/batch/job', logStreamName=ls_name)
|
resp = logs_client.get_log_events(logGroupName='/aws/batch/job', logStreamName=ls_name)
|
||||||
len(resp['events']).should.be.greater_than(5)
|
len(resp['events']).should.be.greater_than(5)
|
||||||
|
|
||||||
|
|
||||||
|
@expected_failure
|
||||||
|
@mock_logs
|
||||||
|
@mock_ec2
|
||||||
|
@mock_ecs
|
||||||
|
@mock_iam
|
||||||
|
@mock_batch
|
||||||
|
def test_list_jobs():
|
||||||
|
ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients()
|
||||||
|
vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client)
|
||||||
|
|
||||||
|
compute_name = 'test_compute_env'
|
||||||
|
resp = batch_client.create_compute_environment(
|
||||||
|
computeEnvironmentName=compute_name,
|
||||||
|
type='UNMANAGED',
|
||||||
|
state='ENABLED',
|
||||||
|
serviceRole=iam_arn
|
||||||
|
)
|
||||||
|
arn = resp['computeEnvironmentArn']
|
||||||
|
|
||||||
|
resp = batch_client.create_job_queue(
|
||||||
|
jobQueueName='test_job_queue',
|
||||||
|
state='ENABLED',
|
||||||
|
priority=123,
|
||||||
|
computeEnvironmentOrder=[
|
||||||
|
{
|
||||||
|
'order': 123,
|
||||||
|
'computeEnvironment': arn
|
||||||
|
},
|
||||||
|
]
|
||||||
|
)
|
||||||
|
queue_arn = resp['jobQueueArn']
|
||||||
|
|
||||||
|
resp = batch_client.register_job_definition(
|
||||||
|
jobDefinitionName='sleep10',
|
||||||
|
type='container',
|
||||||
|
containerProperties={
|
||||||
|
'image': 'busybox',
|
||||||
|
'vcpus': 1,
|
||||||
|
'memory': 128,
|
||||||
|
'command': ['sleep', '10']
|
||||||
|
}
|
||||||
|
)
|
||||||
|
job_def_arn = resp['jobDefinitionArn']
|
||||||
|
|
||||||
|
resp = batch_client.submit_job(
|
||||||
|
jobName='test1',
|
||||||
|
jobQueue=queue_arn,
|
||||||
|
jobDefinition=job_def_arn
|
||||||
|
)
|
||||||
|
job_id1 = resp['jobId']
|
||||||
|
resp = batch_client.submit_job(
|
||||||
|
jobName='test2',
|
||||||
|
jobQueue=queue_arn,
|
||||||
|
jobDefinition=job_def_arn
|
||||||
|
)
|
||||||
|
job_id2 = resp['jobId']
|
||||||
|
|
||||||
|
future = datetime.datetime.now() + datetime.timedelta(seconds=30)
|
||||||
|
|
||||||
|
resp_finished_jobs = batch_client.list_jobs(
|
||||||
|
jobQueue=queue_arn,
|
||||||
|
jobStatus='SUCCEEDED'
|
||||||
|
)
|
||||||
|
|
||||||
|
# Wait only as long as it takes to run the jobs
|
||||||
|
while datetime.datetime.now() < future:
|
||||||
|
resp = batch_client.describe_jobs(jobs=[job_id1, job_id2])
|
||||||
|
|
||||||
|
any_failed_jobs = any([job['status'] == 'FAILED' for job in resp['jobs']])
|
||||||
|
succeeded_jobs = all([job['status'] == 'SUCCEEDED' for job in resp['jobs']])
|
||||||
|
|
||||||
|
if any_failed_jobs:
|
||||||
|
raise RuntimeError('A Batch job failed')
|
||||||
|
if succeeded_jobs:
|
||||||
|
break
|
||||||
|
time.sleep(0.5)
|
||||||
|
else:
|
||||||
|
raise RuntimeError('Batch jobs timed out')
|
||||||
|
|
||||||
|
resp_finished_jobs2 = batch_client.list_jobs(
|
||||||
|
jobQueue=queue_arn,
|
||||||
|
jobStatus='SUCCEEDED'
|
||||||
|
)
|
||||||
|
|
||||||
|
len(resp_finished_jobs['jobSummaryList']).should.equal(0)
|
||||||
|
len(resp_finished_jobs2['jobSummaryList']).should.equal(2)
|
||||||
|
|
||||||
|
|
||||||
|
@expected_failure
|
||||||
|
@mock_logs
|
||||||
|
@mock_ec2
|
||||||
|
@mock_ecs
|
||||||
|
@mock_iam
|
||||||
|
@mock_batch
|
||||||
|
def test_terminate_job():
|
||||||
|
ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients()
|
||||||
|
vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client)
|
||||||
|
|
||||||
|
compute_name = 'test_compute_env'
|
||||||
|
resp = batch_client.create_compute_environment(
|
||||||
|
computeEnvironmentName=compute_name,
|
||||||
|
type='UNMANAGED',
|
||||||
|
state='ENABLED',
|
||||||
|
serviceRole=iam_arn
|
||||||
|
)
|
||||||
|
arn = resp['computeEnvironmentArn']
|
||||||
|
|
||||||
|
resp = batch_client.create_job_queue(
|
||||||
|
jobQueueName='test_job_queue',
|
||||||
|
state='ENABLED',
|
||||||
|
priority=123,
|
||||||
|
computeEnvironmentOrder=[
|
||||||
|
{
|
||||||
|
'order': 123,
|
||||||
|
'computeEnvironment': arn
|
||||||
|
},
|
||||||
|
]
|
||||||
|
)
|
||||||
|
queue_arn = resp['jobQueueArn']
|
||||||
|
|
||||||
|
resp = batch_client.register_job_definition(
|
||||||
|
jobDefinitionName='sleep10',
|
||||||
|
type='container',
|
||||||
|
containerProperties={
|
||||||
|
'image': 'busybox',
|
||||||
|
'vcpus': 1,
|
||||||
|
'memory': 128,
|
||||||
|
'command': ['sleep', '10']
|
||||||
|
}
|
||||||
|
)
|
||||||
|
job_def_arn = resp['jobDefinitionArn']
|
||||||
|
|
||||||
|
resp = batch_client.submit_job(
|
||||||
|
jobName='test1',
|
||||||
|
jobQueue=queue_arn,
|
||||||
|
jobDefinition=job_def_arn
|
||||||
|
)
|
||||||
|
job_id = resp['jobId']
|
||||||
|
|
||||||
|
time.sleep(2)
|
||||||
|
|
||||||
|
batch_client.terminate_job(jobId=job_id, reason='test_terminate')
|
||||||
|
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
resp = batch_client.describe_jobs(jobs=[job_id])
|
||||||
|
resp['jobs'][0]['jobName'].should.equal('test1')
|
||||||
|
resp['jobs'][0]['status'].should.equal('FAILED')
|
||||||
|
resp['jobs'][0]['statusReason'].should.equal('test_terminate')
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user