Added CreateJobQueue and DescribeJobQueue

This commit is contained in:
Terry Cain 2017-10-03 23:21:06 +01:00
parent 88a11b21ae
commit 15218df12f
No known key found for this signature in database
GPG Key ID: 14D90844E4E9B9F3
5 changed files with 237 additions and 3 deletions

View File

@ -10,7 +10,7 @@ from moto.ec2 import ec2_backends
from moto.ecs import ecs_backends
from .exceptions import InvalidParameterValueException, InternalFailure, ClientException
from .utils import make_arn_for_compute_env
from .utils import make_arn_for_compute_env, make_arn_for_job_queue
from moto.ec2.exceptions import InvalidSubnetIdError
from moto.ec2.models import INSTANCE_TYPES as EC2_INSTANCE_TYPES
from moto.iam.exceptions import IAMNotFoundException
@ -41,12 +41,50 @@ class ComputeEnvironment(BaseModel):
self.ecs_name = name
class JobQueue(BaseModel):
def __init__(self, name, priority, state, environments, env_order_json, region_name):
"""
:param name: Job queue name
:type name: str
:param priority: Job queue priority
:type priority: int
:param state: Either ENABLED or DISABLED
:type state: str
:param environments: Compute Environments
:type environments: list of ComputeEnvironment
:param env_order_json: Compute Environments JSON for use when describing
:type env_order_json: list of dict
:param region_name: Region name
:type region_name: str
"""
self.name = name
self.priority = priority
self.state = state
self.environments = environments
self.env_order_json = env_order_json
self.arn = make_arn_for_job_queue(DEFAULT_ACCOUNT_ID, name, region_name)
self.status = 'VALID'
def describe(self):
result = {
'computeEnvironmentOrder': self.env_order_json,
'jobQueueArn': self.arn,
'jobQueueName': self.name,
'priority': self.priority,
'state': self.state,
'status': self.status
}
return result
class BatchBackend(BaseBackend):
def __init__(self, region_name=None):
super(BatchBackend, self).__init__()
self.region_name = region_name
self._compute_environments = {}
self._job_queues = {}
@property
def iam_backend(self):
@ -77,7 +115,7 @@ class BatchBackend(BaseBackend):
self.__dict__ = {}
self.__init__(region_name)
def get_compute_environment_arn(self, arn):
def get_compute_environment_by_arn(self, arn):
return self._compute_environments.get(arn)
def get_compute_environment_by_name(self, name):
@ -95,11 +133,34 @@ class BatchBackend(BaseBackend):
:return: Compute Environment or None
:rtype: ComputeEnvironment or None
"""
env = self.get_compute_environment_arn(identifier)
env = self.get_compute_environment_by_arn(identifier)
if env is None:
env = self.get_compute_environment_by_name(identifier)
return env
def get_job_queue_by_arn(self, arn):
return self._job_queues.get(arn)
def get_job_queue_by_name(self, name):
for comp_env in self._job_queues.values():
if comp_env.name == name:
return comp_env
return None
def get_job_queue(self, identifier):
"""
Get job queue by name or ARN
:param identifier: Name or ARN
:type identifier: str
:return: Job Queue or None
:rtype: JobQueue or None
"""
env = self.get_job_queue_by_arn(identifier)
if env is None:
env = self.get_job_queue_by_name(identifier)
return env
def describe_compute_environments(self, environments=None, max_results=None, next_token=None):
envs = set()
if environments is not None:
@ -333,6 +394,66 @@ class BatchBackend(BaseBackend):
return compute_env.name, compute_env.arn
def create_job_queue(self, queue_name, priority, state, compute_env_order):
"""
Create a job queue
:param queue_name: Queue name
:type queue_name: str
:param priority: Queue priority
:type priority: int
:param state: Queue state
:type state: string
:param compute_env_order: Compute environment list
:type compute_env_order: list of dict
:return: Tuple of Name, ARN
:rtype: tuple of str
"""
for variable, var_name in ((queue_name, 'jobQueueName'), (priority, 'priority'), (state, 'state'), (compute_env_order, 'computeEnvironmentOrder')):
if variable is None:
raise ClientException('{0} must be provided'.format(var_name))
if state not in ('ENABLED', 'DISABLED'):
raise ClientException('state {0} must be one of ENABLED | DISABLED'.format(state))
if self.get_job_queue_by_name(queue_name) is not None:
raise ClientException('Job queue {0} already exists'.format(queue_name))
if len(compute_env_order) == 0:
raise ClientException('At least 1 compute environment must be provided')
try:
# orders and extracts computeEnvironment names
ordered_compute_environments = [item['computeEnvironment'] for item in sorted(compute_env_order, key=lambda x: x['order'])]
env_objects = []
# Check each ARN exists, then make a list of compute env's
for arn in ordered_compute_environments:
env = self.get_compute_environment_by_arn(arn)
if env is None:
raise ClientException('Compute environment {0} does not exist'.format(arn))
env_objects.append(env)
except Exception:
raise ClientException('computeEnvironmentOrder is malformed')
# Create new Job Queue
queue = JobQueue(queue_name, priority, state, env_objects, compute_env_order, self.region_name)
self._job_queues[queue.arn] = queue
return queue_name, queue.arn
def describe_job_queues(self, job_queues=None, max_results=None, next_token=None):
envs = set()
if job_queues is not None:
envs = set(job_queues)
result = []
for arn, job_queue in self._job_queues.items():
# Filter shortcut
if len(envs) > 0 and arn not in envs and job_queue.name not in envs:
continue
result.append(job_queue.describe())
return result
available_regions = boto3.session.Session().get_available_regions("batch")
batch_backends = {region: BatchBackend(region_name=region) for region in available_regions}

View File

@ -88,6 +88,7 @@ class BatchResponse(BaseResponse):
return ''
# UpdateComputeEnvironment
def updatecomputeenvironment(self):
compute_env_name = self._get_param('computeEnvironment')
compute_resource = self._get_param('computeResources')
@ -110,3 +111,38 @@ class BatchResponse(BaseResponse):
}
return json.dumps(result)
# CreateJobQueue
def createjobqueue(self):
compute_env_order = self._get_param('computeEnvironmentOrder')
queue_name = self._get_param('jobQueueName')
priority = self._get_param('priority')
state = self._get_param('state')
try:
name, arn = self.batch_backend.create_job_queue(
queue_name=queue_name,
priority=priority,
state=state,
compute_env_order=compute_env_order
)
except AWSError as err:
return err.response()
result = {
'jobQueueArn': arn,
'jobQueueName': name
}
return json.dumps(result)
# DescribeJobQueues
def describejobqueues(self):
job_queues = self._get_param('jobQueues')
max_results = self._get_param('maxResults') # Ignored, should be int
next_token = self._get_param('nextToken') # Ignored
queues = self.batch_backend.describe_job_queues(job_queues, max_results=max_results, next_token=next_token)
result = {'jobQueues': queues}
return json.dumps(result)

View File

@ -10,4 +10,6 @@ url_paths = {
'{0}/v1/describecomputeenvironments$': BatchResponse.dispatch,
'{0}/v1/deletecomputeenvironment': BatchResponse.dispatch,
'{0}/v1/updatecomputeenvironment': BatchResponse.dispatch,
'{0}/v1/createjobqueue': BatchResponse.dispatch,
'{0}/v1/describejobqueues': BatchResponse.dispatch,
}

View File

@ -3,3 +3,7 @@ from __future__ import unicode_literals
def make_arn_for_compute_env(account_id, name, region_name):
return "arn:aws:batch:{0}:{1}:compute-environment/{2}".format(region_name, account_id, name)
def make_arn_for_job_queue(account_id, name, region_name):
return "arn:aws:batch:{0}:{1}:job-queue/{2}".format(region_name, account_id, name)

View File

@ -1,6 +1,7 @@
from __future__ import unicode_literals
import boto3
from botocore.exceptions import ClientError
import sure # noqa
from moto import mock_batch, mock_iam, mock_ec2, mock_ecs
@ -265,3 +266,73 @@ def test_update_unmanaged_compute_environment_state():
resp = batch_client.describe_compute_environments()
len(resp['computeEnvironments']).should.equal(1)
resp['computeEnvironments'][0]['state'].should.equal('DISABLED')
@mock_ec2
@mock_ecs
@mock_iam
@mock_batch
def test_create_job_queue():
ec2_client, iam_client, ecs_client, batch_client = _get_clients()
vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client)
compute_name = 'test_compute_env'
resp = batch_client.create_compute_environment(
computeEnvironmentName=compute_name,
type='UNMANAGED',
state='ENABLED',
serviceRole=iam_arn
)
arn = resp['computeEnvironmentArn']
resp = batch_client.create_job_queue(
jobQueueName='test_job_queue',
state='ENABLED',
priority=123,
computeEnvironmentOrder=[
{
'order': 123,
'computeEnvironment': arn
},
]
)
resp.should.contain('jobQueueArn')
resp.should.contain('jobQueueName')
queue_arn = resp['jobQueueArn']
resp = batch_client.describe_job_queues()
resp.should.contain('jobQueues')
resp['jobQueues'][0]['jobQueueArn'].should.equal(queue_arn)
@mock_ec2
@mock_ecs
@mock_iam
@mock_batch
def test_job_queue_bad_arn():
ec2_client, iam_client, ecs_client, batch_client = _get_clients()
vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client)
compute_name = 'test_compute_env'
resp = batch_client.create_compute_environment(
computeEnvironmentName=compute_name,
type='UNMANAGED',
state='ENABLED',
serviceRole=iam_arn
)
arn = resp['computeEnvironmentArn']
try:
batch_client.create_job_queue(
jobQueueName='test_job_queue',
state='ENABLED',
priority=123,
computeEnvironmentOrder=[
{
'order': 123,
'computeEnvironment': arn + 'LALALA'
},
]
)
except ClientError as err:
err.response['Error']['Code'].should.equal('ClientException')