Added DescribeJobDefinitions

This commit is contained in:
Terry Cain 2017-10-05 00:00:40 +01:00
parent 558f246115
commit 0ca3fcc7a2
No known key found for this signature in database
GPG Key ID: 14D90844E4E9B9F3
4 changed files with 262 additions and 24 deletions

View File

@ -87,29 +87,30 @@ class JobDefinition(BaseModel):
self._region = region_name
self.container_properties = container_properties
self.arn = None
self.status = 'INACTIVE'
self.parameters = {}
if parameters is not None:
if not isinstance(parameters, dict):
raise ClientException('parameters must be a string to string map')
self.parameters = parameters
if _type not in ('container',):
raise ClientException('type must be one of "container"')
if parameters is None:
parameters = {}
self.parameters = parameters
self._validate()
self._update_arn()
# For future use when containers arnt the only thing in batch
if _type != 'container':
raise NotImplementedError()
self._validate_container_properties()
def _update_arn(self):
self.revision += 1
self.arn = make_arn_for_task_def(DEFAULT_ACCOUNT_ID, self.name, self.revision, self._region)
def _validate_container_properties(self):
def _validate(self):
if self.type not in ('container',):
raise ClientException('type must be one of "container"')
# For future use when containers arnt the only thing in batch
if self.type != 'container':
raise NotImplementedError()
if not isinstance(self.parameters, dict):
raise ClientException('parameters must be a string to string map')
if 'image' not in self.container_properties:
raise ClientException('containerProperties must contain image')
@ -123,6 +124,37 @@ class JobDefinition(BaseModel):
if self.container_properties['vcpus'] < 1:
raise ClientException('container vcpus limit must be greater than 0')
def update(self, parameters, _type, container_properties, retry_strategy):
if parameters is None:
parameters = self.parameters
if _type is None:
_type = self.type
if container_properties is None:
container_properties = self.container_properties
if retry_strategy is None:
retry_strategy = self.retries
return JobDefinition(self.name, parameters, _type, container_properties, region_name=self._region, revision=self.revision, retry_strategy=retry_strategy)
def describe(self):
result = {
'jobDefinitionArn': self.arn,
'jobDefinitionName': self.name,
'parameters': self.parameters,
'revision': self.revision,
'status': self.status,
'type': self.type
}
if self.container_properties is not None:
result['containerProperties'] = self.container_properties
if self.retries is not None and self.retries > 0:
result['retryStrategy'] = {'attempts': self.retries}
return result
class BatchBackend(BaseBackend):
def __init__(self, region_name=None):
@ -211,26 +243,52 @@ class BatchBackend(BaseBackend):
def get_job_definition_by_arn(self, arn):
return self._job_definitions.get(arn)
def get_job_definition_by_name(self, name):
def get_job_definition_by_name(self, name):#
for comp_env in self._job_definitions.values():
if comp_env.name == name:
return comp_env
return None
def get_job_definition_by_name_revision(self, name, revision):
for job_def in self._job_definitions.values():
if job_def.name == name and job_def.revision == revision:
return job_def
return None
def get_job_definition(self, identifier):
"""
Get job queue by name or ARN
Get job defintiion by name or ARN
:param identifier: Name or ARN
:type identifier: str
:return: Job Queue or None
:rtype: JobQueue or None
:return: Job definition or None
:rtype: JobDefinition or None
"""
env = self.get_job_definition_by_arn(identifier)
if env is None:
env = self.get_job_definition_by_name(identifier)
return env
def get_job_definitions(self, identifier):
"""
Get job defintiion by name or ARN
:param identifier: Name or ARN
:type identifier: str
:return: Job definition or None
:rtype: list of JobDefinition
"""
result = []
env = self.get_job_definition_by_arn(identifier)
if env is not None:
result.append(env)
else:
for value in self._job_definitions.values():
if value.name == identifier:
result.append(value)
return result
def describe_compute_environments(self, environments=None, max_results=None, next_token=None):
envs = set()
if environments is not None:
@ -586,20 +644,53 @@ class BatchBackend(BaseBackend):
if def_name is None:
raise ClientException('jobDefinitionName must be provided')
if self.get_job_definition_by_name(def_name) is not None:
raise ClientException('A job definition called {0} already exists'.format(def_name))
job_def = self.get_job_definition_by_name(def_name)
if retry_strategy is not None:
try:
retry_strategy = retry_strategy['attempts']
except Exception:
raise ClientException('retryStrategy is malformed')
job_def = JobDefinition(def_name, parameters, _type, container_properties, region_name=self.region_name, retry_strategy=retry_strategy)
if job_def is None:
job_def = JobDefinition(def_name, parameters, _type, container_properties, region_name=self.region_name, retry_strategy=retry_strategy)
else:
# Make new jobdef
job_def = job_def.update(parameters, _type, container_properties, retry_strategy)
self._job_definitions[job_def.arn] = job_def
return def_name, job_def.arn, job_def.revision
def deregister_job_definition(self, def_name):
job_def = self.get_job_definition_by_arn(def_name)
if job_def is None and ':' in def_name:
name, revision = def_name.split(':', 1)
job_def = self.get_job_definition_by_name_revision(name, revision)
if job_def is not None:
del self._job_definitions[job_def.arn]
def describe_job_definitions(self, job_def_name=None, job_def_list=None, status=None, max_results=None, next_token=None):
jobs = []
# As a job name can reference multiple revisions, we get a list of them
if job_def_name is not None:
job_def = self.get_job_definitions(job_def_name)
if job_def is not None:
jobs.extend(job_def)
elif job_def_list is not None:
for job in job_def_list:
job_def = self.get_job_definitions(job)
if job_def is not None:
jobs.extend(job_def)
else:
jobs.extend(self._job_definitions.values())
# Got all the job defs were after, filter then by status
if status is not None:
return [job for job in jobs if job.status == status]
return jobs
available_regions = boto3.session.Session().get_available_regions("batch")
batch_backends = {region: BatchBackend(region_name=region) for region in available_regions}

View File

@ -205,3 +205,24 @@ class BatchResponse(BaseResponse):
}
return json.dumps(result)
# DeregisterJobDefinition
def deregisterjobdefinition(self):
queue_name = self._get_param('jobDefinition')
self.batch_backend.deregister_job_definition(queue_name)
return ''
# DescribeJobDefinitions
def describejobdefinitions(self):
job_def_name = self._get_param('jobDefinitionName')
job_def_list = self._get_param('jobDefinitions')
max_results = self._get_param('maxResults')
next_token = self._get_param('nextToken')
status = self._get_param('status')
job_defs = self.batch_backend.describe_job_definitions(job_def_name, job_def_list, status, max_results, next_token)
result = {'jobDefinitions': [job.describe() for job in job_defs]}
return json.dumps(result)

View File

@ -14,5 +14,7 @@ url_paths = {
'{0}/v1/describejobqueues': BatchResponse.dispatch,
'{0}/v1/updatejobqueue': BatchResponse.dispatch,
'{0}/v1/deletejobqueue': BatchResponse.dispatch,
'{0}/v1/registerjobdefinition': BatchResponse.dispatch
'{0}/v1/registerjobdefinition': BatchResponse.dispatch,
'{0}/v1/deregisterjobdefinition': BatchResponse.dispatch,
'{0}/v1/describejobdefinitions': BatchResponse.dispatch
}

View File

@ -447,3 +447,127 @@ def test_register_task_definition():
resp.should.contain('revision')
assert resp['jobDefinitionArn'].endswith('{0}:{1}'.format(resp['jobDefinitionName'], resp['revision']))
@mock_ec2
@mock_ecs
@mock_iam
@mock_batch
def test_reregister_task_definition():
# Reregistering task with the same name bumps the revision number
ec2_client, iam_client, ecs_client, batch_client = _get_clients()
vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client)
resp1 = batch_client.register_job_definition(
jobDefinitionName='sleep10',
type='container',
containerProperties={
'image': 'busybox',
'vcpus': 1,
'memory': 128,
'command': ['sleep', '10']
}
)
resp1.should.contain('jobDefinitionArn')
resp1.should.contain('jobDefinitionName')
resp1.should.contain('revision')
assert resp1['jobDefinitionArn'].endswith('{0}:{1}'.format(resp1['jobDefinitionName'], resp1['revision']))
resp1['revision'].should.equal(1)
resp2 = batch_client.register_job_definition(
jobDefinitionName='sleep10',
type='container',
containerProperties={
'image': 'busybox',
'vcpus': 1,
'memory': 68,
'command': ['sleep', '10']
}
)
resp2['revision'].should.equal(2)
resp2['jobDefinitionArn'].should_not.equal(resp1['jobDefinitionArn'])
@mock_ec2
@mock_ecs
@mock_iam
@mock_batch
def test_delete_task_definition():
ec2_client, iam_client, ecs_client, batch_client = _get_clients()
vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client)
resp = batch_client.register_job_definition(
jobDefinitionName='sleep10',
type='container',
containerProperties={
'image': 'busybox',
'vcpus': 1,
'memory': 128,
'command': ['sleep', '10']
}
)
batch_client.deregister_job_definition(jobDefinition=resp['jobDefinitionArn'])
resp = batch_client.describe_job_definitions()
len(resp['jobDefinitions']).should.equal(0)
@mock_ec2
@mock_ecs
@mock_iam
@mock_batch
def test_describe_task_definition():
ec2_client, iam_client, ecs_client, batch_client = _get_clients()
vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client)
resp = batch_client.register_job_definition(
jobDefinitionName='sleep10',
type='container',
containerProperties={
'image': 'busybox',
'vcpus': 1,
'memory': 128,
'command': ['sleep', '10']
}
)
arn1 = resp['jobDefinitionArn']
resp = batch_client.register_job_definition(
jobDefinitionName='sleep10',
type='container',
containerProperties={
'image': 'busybox',
'vcpus': 1,
'memory': 64,
'command': ['sleep', '10']
}
)
arn2 = resp['jobDefinitionArn']
resp = batch_client.register_job_definition(
jobDefinitionName='test1',
type='container',
containerProperties={
'image': 'busybox',
'vcpus': 1,
'memory': 64,
'command': ['sleep', '10']
}
)
arn3 = resp['jobDefinitionArn']
resp = batch_client.describe_job_definitions(
jobDefinitionName='sleep10'
)
len(resp['jobDefinitions']).should.equal(2)
resp = batch_client.describe_job_definitions()
len(resp['jobDefinitions']).should.equal(3)
resp = batch_client.describe_job_definitions(
jobDefinitions=['sleep10', 'test1']
)
len(resp['jobDefinitions']).should.equal(3)