From 15218df12fafd451ba03317696f7a71323a1e0dc Mon Sep 17 00:00:00 2001 From: Terry Cain Date: Tue, 3 Oct 2017 23:21:06 +0100 Subject: [PATCH] Added CreateJobQueue and DescribeJobQueue --- moto/batch/models.py | 127 ++++++++++++++++++++++++++++++++- moto/batch/responses.py | 36 ++++++++++ moto/batch/urls.py | 2 + moto/batch/utils.py | 4 ++ tests/test_batch/test_batch.py | 71 ++++++++++++++++++ 5 files changed, 237 insertions(+), 3 deletions(-) diff --git a/moto/batch/models.py b/moto/batch/models.py index 8572a46c0..e336a60d7 100644 --- a/moto/batch/models.py +++ b/moto/batch/models.py @@ -10,7 +10,7 @@ from moto.ec2 import ec2_backends from moto.ecs import ecs_backends from .exceptions import InvalidParameterValueException, InternalFailure, ClientException -from .utils import make_arn_for_compute_env +from .utils import make_arn_for_compute_env, make_arn_for_job_queue from moto.ec2.exceptions import InvalidSubnetIdError from moto.ec2.models import INSTANCE_TYPES as EC2_INSTANCE_TYPES from moto.iam.exceptions import IAMNotFoundException @@ -41,12 +41,50 @@ class ComputeEnvironment(BaseModel): self.ecs_name = name +class JobQueue(BaseModel): + def __init__(self, name, priority, state, environments, env_order_json, region_name): + """ + :param name: Job queue name + :type name: str + :param priority: Job queue priority + :type priority: int + :param state: Either ENABLED or DISABLED + :type state: str + :param environments: Compute Environments + :type environments: list of ComputeEnvironment + :param env_order_json: Compute Environments JSON for use when describing + :type env_order_json: list of dict + :param region_name: Region name + :type region_name: str + """ + self.name = name + self.priority = priority + self.state = state + self.environments = environments + self.env_order_json = env_order_json + self.arn = make_arn_for_job_queue(DEFAULT_ACCOUNT_ID, name, region_name) + self.status = 'VALID' + + def describe(self): + result = { + 'computeEnvironmentOrder': self.env_order_json, + 'jobQueueArn': self.arn, + 'jobQueueName': self.name, + 'priority': self.priority, + 'state': self.state, + 'status': self.status + } + + return result + + class BatchBackend(BaseBackend): def __init__(self, region_name=None): super(BatchBackend, self).__init__() self.region_name = region_name self._compute_environments = {} + self._job_queues = {} @property def iam_backend(self): @@ -77,7 +115,7 @@ class BatchBackend(BaseBackend): self.__dict__ = {} self.__init__(region_name) - def get_compute_environment_arn(self, arn): + def get_compute_environment_by_arn(self, arn): return self._compute_environments.get(arn) def get_compute_environment_by_name(self, name): @@ -95,11 +133,34 @@ class BatchBackend(BaseBackend): :return: Compute Environment or None :rtype: ComputeEnvironment or None """ - env = self.get_compute_environment_arn(identifier) + env = self.get_compute_environment_by_arn(identifier) if env is None: env = self.get_compute_environment_by_name(identifier) return env + def get_job_queue_by_arn(self, arn): + return self._job_queues.get(arn) + + def get_job_queue_by_name(self, name): + for comp_env in self._job_queues.values(): + if comp_env.name == name: + return comp_env + return None + + def get_job_queue(self, identifier): + """ + Get job queue by name or ARN + :param identifier: Name or ARN + :type identifier: str + + :return: Job Queue or None + :rtype: JobQueue or None + """ + env = self.get_job_queue_by_arn(identifier) + if env is None: + env = self.get_job_queue_by_name(identifier) + return env + def describe_compute_environments(self, environments=None, max_results=None, next_token=None): envs = set() if environments is not None: @@ -333,6 +394,66 @@ class BatchBackend(BaseBackend): return compute_env.name, compute_env.arn + def create_job_queue(self, queue_name, priority, state, compute_env_order): + """ + Create a job queue + + :param queue_name: Queue name + :type queue_name: str + :param priority: Queue priority + :type priority: int + :param state: Queue state + :type state: string + :param compute_env_order: Compute environment list + :type compute_env_order: list of dict + :return: Tuple of Name, ARN + :rtype: tuple of str + """ + for variable, var_name in ((queue_name, 'jobQueueName'), (priority, 'priority'), (state, 'state'), (compute_env_order, 'computeEnvironmentOrder')): + if variable is None: + raise ClientException('{0} must be provided'.format(var_name)) + + if state not in ('ENABLED', 'DISABLED'): + raise ClientException('state {0} must be one of ENABLED | DISABLED'.format(state)) + if self.get_job_queue_by_name(queue_name) is not None: + raise ClientException('Job queue {0} already exists'.format(queue_name)) + + if len(compute_env_order) == 0: + raise ClientException('At least 1 compute environment must be provided') + try: + # orders and extracts computeEnvironment names + ordered_compute_environments = [item['computeEnvironment'] for item in sorted(compute_env_order, key=lambda x: x['order'])] + env_objects = [] + # Check each ARN exists, then make a list of compute env's + for arn in ordered_compute_environments: + env = self.get_compute_environment_by_arn(arn) + if env is None: + raise ClientException('Compute environment {0} does not exist'.format(arn)) + env_objects.append(env) + except Exception: + raise ClientException('computeEnvironmentOrder is malformed') + + # Create new Job Queue + queue = JobQueue(queue_name, priority, state, env_objects, compute_env_order, self.region_name) + self._job_queues[queue.arn] = queue + + return queue_name, queue.arn + + def describe_job_queues(self, job_queues=None, max_results=None, next_token=None): + envs = set() + if job_queues is not None: + envs = set(job_queues) + + result = [] + for arn, job_queue in self._job_queues.items(): + # Filter shortcut + if len(envs) > 0 and arn not in envs and job_queue.name not in envs: + continue + + result.append(job_queue.describe()) + + return result + available_regions = boto3.session.Session().get_available_regions("batch") batch_backends = {region: BatchBackend(region_name=region) for region in available_regions} diff --git a/moto/batch/responses.py b/moto/batch/responses.py index 86ee4fdfe..661b9c7c2 100644 --- a/moto/batch/responses.py +++ b/moto/batch/responses.py @@ -88,6 +88,7 @@ class BatchResponse(BaseResponse): return '' + # UpdateComputeEnvironment def updatecomputeenvironment(self): compute_env_name = self._get_param('computeEnvironment') compute_resource = self._get_param('computeResources') @@ -110,3 +111,38 @@ class BatchResponse(BaseResponse): } return json.dumps(result) + + # CreateJobQueue + def createjobqueue(self): + compute_env_order = self._get_param('computeEnvironmentOrder') + queue_name = self._get_param('jobQueueName') + priority = self._get_param('priority') + state = self._get_param('state') + + try: + name, arn = self.batch_backend.create_job_queue( + queue_name=queue_name, + priority=priority, + state=state, + compute_env_order=compute_env_order + ) + except AWSError as err: + return err.response() + + result = { + 'jobQueueArn': arn, + 'jobQueueName': name + } + + return json.dumps(result) + + # DescribeJobQueues + def describejobqueues(self): + job_queues = self._get_param('jobQueues') + max_results = self._get_param('maxResults') # Ignored, should be int + next_token = self._get_param('nextToken') # Ignored + + queues = self.batch_backend.describe_job_queues(job_queues, max_results=max_results, next_token=next_token) + + result = {'jobQueues': queues} + return json.dumps(result) diff --git a/moto/batch/urls.py b/moto/batch/urls.py index ef8f7927a..227e78ecf 100644 --- a/moto/batch/urls.py +++ b/moto/batch/urls.py @@ -10,4 +10,6 @@ url_paths = { '{0}/v1/describecomputeenvironments$': BatchResponse.dispatch, '{0}/v1/deletecomputeenvironment': BatchResponse.dispatch, '{0}/v1/updatecomputeenvironment': BatchResponse.dispatch, + '{0}/v1/createjobqueue': BatchResponse.dispatch, + '{0}/v1/describejobqueues': BatchResponse.dispatch, } diff --git a/moto/batch/utils.py b/moto/batch/utils.py index d323a9bf7..68c6a3581 100644 --- a/moto/batch/utils.py +++ b/moto/batch/utils.py @@ -3,3 +3,7 @@ from __future__ import unicode_literals def make_arn_for_compute_env(account_id, name, region_name): return "arn:aws:batch:{0}:{1}:compute-environment/{2}".format(region_name, account_id, name) + + +def make_arn_for_job_queue(account_id, name, region_name): + return "arn:aws:batch:{0}:{1}:job-queue/{2}".format(region_name, account_id, name) diff --git a/tests/test_batch/test_batch.py b/tests/test_batch/test_batch.py index 159f255c3..6bf68a6fc 100644 --- a/tests/test_batch/test_batch.py +++ b/tests/test_batch/test_batch.py @@ -1,6 +1,7 @@ from __future__ import unicode_literals import boto3 +from botocore.exceptions import ClientError import sure # noqa from moto import mock_batch, mock_iam, mock_ec2, mock_ecs @@ -265,3 +266,73 @@ def test_update_unmanaged_compute_environment_state(): resp = batch_client.describe_compute_environments() len(resp['computeEnvironments']).should.equal(1) resp['computeEnvironments'][0]['state'].should.equal('DISABLED') + + +@mock_ec2 +@mock_ecs +@mock_iam +@mock_batch +def test_create_job_queue(): + ec2_client, iam_client, ecs_client, batch_client = _get_clients() + vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) + + compute_name = 'test_compute_env' + resp = batch_client.create_compute_environment( + computeEnvironmentName=compute_name, + type='UNMANAGED', + state='ENABLED', + serviceRole=iam_arn + ) + arn = resp['computeEnvironmentArn'] + + resp = batch_client.create_job_queue( + jobQueueName='test_job_queue', + state='ENABLED', + priority=123, + computeEnvironmentOrder=[ + { + 'order': 123, + 'computeEnvironment': arn + }, + ] + ) + resp.should.contain('jobQueueArn') + resp.should.contain('jobQueueName') + queue_arn = resp['jobQueueArn'] + + resp = batch_client.describe_job_queues() + resp.should.contain('jobQueues') + resp['jobQueues'][0]['jobQueueArn'].should.equal(queue_arn) + + +@mock_ec2 +@mock_ecs +@mock_iam +@mock_batch +def test_job_queue_bad_arn(): + ec2_client, iam_client, ecs_client, batch_client = _get_clients() + vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) + + compute_name = 'test_compute_env' + resp = batch_client.create_compute_environment( + computeEnvironmentName=compute_name, + type='UNMANAGED', + state='ENABLED', + serviceRole=iam_arn + ) + arn = resp['computeEnvironmentArn'] + + try: + batch_client.create_job_queue( + jobQueueName='test_job_queue', + state='ENABLED', + priority=123, + computeEnvironmentOrder=[ + { + 'order': 123, + 'computeEnvironment': arn + 'LALALA' + }, + ] + ) + except ClientError as err: + err.response['Error']['Code'].should.equal('ClientException')