Merge pull request #2359 from bkovacki/batch_submit_job_with_job_definition_name

Add option to call batch submit_job with job definition name only
This commit is contained in:
Mike Grima 2019-08-07 15:13:52 -07:00 committed by GitHub
commit 2358433f74
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 96 additions and 11 deletions

View File

@ -514,10 +514,13 @@ class BatchBackend(BaseBackend):
return self._job_definitions.get(arn)
def get_job_definition_by_name(self, name):
for comp_env in self._job_definitions.values():
if comp_env.name == name:
return comp_env
return None
latest_revision = -1
latest_job = None
for job_def in self._job_definitions.values():
if job_def.name == name and job_def.revision > latest_revision:
latest_job = job_def
latest_revision = job_def.revision
return latest_job
def get_job_definition_by_name_revision(self, name, revision):
for job_def in self._job_definitions.values():
@ -534,10 +537,13 @@ class BatchBackend(BaseBackend):
:return: Job definition or None
:rtype: JobDefinition or None
"""
env = self.get_job_definition_by_arn(identifier)
if env is None:
env = self.get_job_definition_by_name(identifier)
return env
job_def = self.get_job_definition_by_arn(identifier)
if job_def is None:
if ':' in identifier:
job_def = self.get_job_definition_by_name_revision(*identifier.split(':', 1))
else:
job_def = self.get_job_definition_by_name(identifier)
return job_def
def get_job_definitions(self, identifier):
"""
@ -984,9 +990,7 @@ class BatchBackend(BaseBackend):
# TODO parameters, retries (which is a dict raw from request), job dependancies and container overrides are ignored for now
# Look for job definition
job_def = self.get_job_definition_by_arn(job_def_id)
if job_def is None and ':' in job_def_id:
job_def = self.get_job_definition_by_name_revision(*job_def_id.split(':', 1))
job_def = self.get_job_definition(job_def_id)
if job_def is None:
raise ClientException('Job definition {0} does not exist'.format(job_def_id))

View File

@ -642,6 +642,87 @@ def test_describe_task_definition():
len(resp['jobDefinitions']).should.equal(3)
@mock_logs
@mock_ec2
@mock_ecs
@mock_iam
@mock_batch
def test_submit_job_by_name():
ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients()
vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client)
compute_name = 'test_compute_env'
resp = batch_client.create_compute_environment(
computeEnvironmentName=compute_name,
type='UNMANAGED',
state='ENABLED',
serviceRole=iam_arn
)
arn = resp['computeEnvironmentArn']
resp = batch_client.create_job_queue(
jobQueueName='test_job_queue',
state='ENABLED',
priority=123,
computeEnvironmentOrder=[
{
'order': 123,
'computeEnvironment': arn
},
]
)
queue_arn = resp['jobQueueArn']
job_definition_name = 'sleep10'
batch_client.register_job_definition(
jobDefinitionName=job_definition_name,
type='container',
containerProperties={
'image': 'busybox',
'vcpus': 1,
'memory': 128,
'command': ['sleep', '10']
}
)
batch_client.register_job_definition(
jobDefinitionName=job_definition_name,
type='container',
containerProperties={
'image': 'busybox',
'vcpus': 1,
'memory': 256,
'command': ['sleep', '10']
}
)
resp = batch_client.register_job_definition(
jobDefinitionName=job_definition_name,
type='container',
containerProperties={
'image': 'busybox',
'vcpus': 1,
'memory': 512,
'command': ['sleep', '10']
}
)
job_definition_arn = resp['jobDefinitionArn']
resp = batch_client.submit_job(
jobName='test1',
jobQueue=queue_arn,
jobDefinition=job_definition_name
)
job_id = resp['jobId']
resp_jobs = batch_client.describe_jobs(jobs=[job_id])
# batch_client.terminate_job(jobId=job_id)
len(resp_jobs['jobs']).should.equal(1)
resp_jobs['jobs'][0]['jobId'].should.equal(job_id)
resp_jobs['jobs'][0]['jobQueue'].should.equal(queue_arn)
resp_jobs['jobs'][0]['jobDefinition'].should.equal(job_definition_arn)
# SLOW TESTS
@expected_failure
@mock_logs