From bba6d23eaeab974ff1788eb6291b9e44785e5f05 Mon Sep 17 00:00:00 2001 From: Terry Cain Date: Tue, 26 Sep 2017 17:37:26 +0100 Subject: [PATCH 01/22] Started on batch --- moto/batch/__init__.py | 6 ++++++ moto/batch/exceptions.py | 3 +++ moto/batch/models.py | 23 +++++++++++++++++++++++ moto/batch/responses.py | 14 ++++++++++++++ tests/test_batch/test_batch.py | 11 +++++++++++ tests/test_batch/test_server.py | 16 ++++++++++++++++ 6 files changed, 73 insertions(+) create mode 100644 moto/batch/__init__.py create mode 100644 moto/batch/exceptions.py create mode 100644 moto/batch/models.py create mode 100644 moto/batch/responses.py create mode 100644 tests/test_batch/test_batch.py create mode 100644 tests/test_batch/test_server.py diff --git a/moto/batch/__init__.py b/moto/batch/__init__.py new file mode 100644 index 000000000..6002b6fc7 --- /dev/null +++ b/moto/batch/__init__.py @@ -0,0 +1,6 @@ +from __future__ import unicode_literals +from .models import batch_backends +from ..core.models import base_decorator + +batch_backend = batch_backends['us-east-1'] +mock_batch = base_decorator(batch_backends) diff --git a/moto/batch/exceptions.py b/moto/batch/exceptions.py new file mode 100644 index 000000000..e598ee7af --- /dev/null +++ b/moto/batch/exceptions.py @@ -0,0 +1,3 @@ +from __future__ import unicode_literals +from moto.core.exceptions import RESTError + diff --git a/moto/batch/models.py b/moto/batch/models.py new file mode 100644 index 000000000..a54b30c32 --- /dev/null +++ b/moto/batch/models.py @@ -0,0 +1,23 @@ +from __future__ import unicode_literals +import boto3 +from moto.core import BaseBackend, BaseModel + + +class BatchBackend(BaseBackend): + def __init__(self, region_name=None): + super(BatchBackend, self).__init__() + self.region_name = region_name + + def reset(self): + region_name = self.region_name + self.__dict__ = {} + self.__init__(region_name) + + def create_compute_environment(self, compute_environment_name, type, state, compute_resources, service_role): + # implement here + return compute_environment_name, compute_environment_arn + # add methods from here + + +available_regions = boto3.session.Session().get_available_regions("batch") +batch_backends = {region: BatchBackend for region in available_regions} diff --git a/moto/batch/responses.py b/moto/batch/responses.py new file mode 100644 index 000000000..d91af8a77 --- /dev/null +++ b/moto/batch/responses.py @@ -0,0 +1,14 @@ +from __future__ import unicode_literals +from moto.core.responses import BaseResponse +from .models import batch_backends + + +class BatchResponse(BaseResponse): + @property + def batch_backend(self): + return batch_backends[self.region] + + # add methods from here + + +# add teampltes from here diff --git a/tests/test_batch/test_batch.py b/tests/test_batch/test_batch.py new file mode 100644 index 000000000..eafd32eae --- /dev/null +++ b/tests/test_batch/test_batch.py @@ -0,0 +1,11 @@ +from __future__ import unicode_literals + +import boto3 +import sure # noqa +from moto import mock_batch + + +@mock_batch +def test_list(): + # do test + pass \ No newline at end of file diff --git a/tests/test_batch/test_server.py b/tests/test_batch/test_server.py new file mode 100644 index 000000000..7c0d2b3a1 --- /dev/null +++ b/tests/test_batch/test_server.py @@ -0,0 +1,16 @@ +from __future__ import unicode_literals + +import sure # noqa + +import moto.server as server +from moto import mock_batch + +''' +Test the different server responses +''' + +@mock_batch +def test_batch_list(): + backend = server.create_backend_app("batch") + test_client = backend.test_client() + # do test \ No newline at end of file From f9c8836d54038fe85e703d0f241ab3d0206215e7 Mon Sep 17 00:00:00 2001 From: Terry Cain Date: Tue, 26 Sep 2017 19:55:44 +0100 Subject: [PATCH 02/22] . --- moto/__init__.py | 1 + moto/backends.py | 2 ++ moto/batch/urls.py | 10 ++++++++++ moto/batch/utils.py | 6 ++++++ 4 files changed, 19 insertions(+) create mode 100644 moto/batch/urls.py create mode 100644 moto/batch/utils.py diff --git a/moto/__init__.py b/moto/__init__.py index a832def53..cce157914 100644 --- a/moto/__init__.py +++ b/moto/__init__.py @@ -9,6 +9,7 @@ from .acm import mock_acm # flake8: noqa from .apigateway import mock_apigateway, mock_apigateway_deprecated # flake8: noqa from .autoscaling import mock_autoscaling, mock_autoscaling_deprecated # flake8: noqa from .awslambda import mock_lambda, mock_lambda_deprecated # flake8: noqa +from .batch import mock_batch # flake8: noqa from .cloudformation import mock_cloudformation, mock_cloudformation_deprecated # flake8: noqa from .cloudwatch import mock_cloudwatch, mock_cloudwatch_deprecated # flake8: noqa from .datapipeline import mock_datapipeline, mock_datapipeline_deprecated # flake8: noqa diff --git a/moto/backends.py b/moto/backends.py index da9d1821d..2725088d9 100644 --- a/moto/backends.py +++ b/moto/backends.py @@ -4,6 +4,7 @@ from moto.acm import acm_backends from moto.apigateway import apigateway_backends from moto.autoscaling import autoscaling_backends from moto.awslambda import lambda_backends +from moto.batch import batch_backends from moto.cloudformation import cloudformation_backends from moto.cloudwatch import cloudwatch_backends from moto.core import moto_api_backends @@ -38,6 +39,7 @@ BACKENDS = { 'acm': acm_backends, 'apigateway': apigateway_backends, 'autoscaling': autoscaling_backends, + 'batch': batch_backends, 'cloudformation': cloudformation_backends, 'cloudwatch': cloudwatch_backends, 'datapipeline': datapipeline_backends, diff --git a/moto/batch/urls.py b/moto/batch/urls.py new file mode 100644 index 000000000..27cd9fc51 --- /dev/null +++ b/moto/batch/urls.py @@ -0,0 +1,10 @@ +from __future__ import unicode_literals +from .responses import BatchResponse + +url_bases = [ + "https?://batch.(.+).amazonaws.com", +] + +url_paths = { + '{0}/$': BatchResponse.dispatch, +} diff --git a/moto/batch/utils.py b/moto/batch/utils.py new file mode 100644 index 000000000..33e474d61 --- /dev/null +++ b/moto/batch/utils.py @@ -0,0 +1,6 @@ +from __future__ import unicode_literals +import uuid + + +def make_arn_for_topic(account_id, name, region_name): + return "arn:aws:sns:{0}:{1}:{2}".format(region_name, account_id, name) From 56e4300ad4d3c729093b858e7f9d8b4008b3d4f3 Mon Sep 17 00:00:00 2001 From: Terry Cain Date: Tue, 26 Sep 2017 22:22:59 +0100 Subject: [PATCH 03/22] Added preliminary CreateComputeEnvironment --- moto/batch/exceptions.py | 31 +++++++- moto/batch/models.py | 141 +++++++++++++++++++++++++++++++-- moto/batch/responses.py | 48 ++++++++++- moto/batch/urls.py | 2 +- moto/batch/utils.py | 5 +- moto/iam/models.py | 6 ++ tests/test_batch/test_batch.py | 85 +++++++++++++++++++- 7 files changed, 302 insertions(+), 16 deletions(-) diff --git a/moto/batch/exceptions.py b/moto/batch/exceptions.py index e598ee7af..cd6031a95 100644 --- a/moto/batch/exceptions.py +++ b/moto/batch/exceptions.py @@ -1,3 +1,32 @@ from __future__ import unicode_literals -from moto.core.exceptions import RESTError +import json + +class AWSError(Exception): + CODE = None + STATUS = 400 + + def __init__(self, message, code=None, status=None): + self.message = message + self.code = code if code is not None else self.CODE + self.status = status if status is not None else self.STATUS + + def response(self): + return json.dumps({'__type': self.code, 'message': self.message}), dict(status=self.status) + + +class InvalidRequestException(AWSError): + CODE = 'InvalidRequestException' + + +class InvalidParameterValueException(AWSError): + CODE = 'InvalidParameterValue' + + +class ValidationError(AWSError): + CODE = 'ValidationError' + + +class InternalFailure(AWSError): + CODE = 'InternalFailure' + STATUS = 500 diff --git a/moto/batch/models.py b/moto/batch/models.py index a54b30c32..c7def48d1 100644 --- a/moto/batch/models.py +++ b/moto/batch/models.py @@ -1,6 +1,28 @@ from __future__ import unicode_literals import boto3 +import re from moto.core import BaseBackend, BaseModel +from moto.iam import iam_backends +from moto.ec2 import ec2_backends + +from .exceptions import InvalidParameterValueException, InternalFailure +from .utils import make_arn_for_compute_env +from moto.ec2.exceptions import InvalidSubnetIdError +from moto.iam.exceptions import IAMNotFoundException + + +DEFAULT_ACCOUNT_ID = 123456789012 +COMPUTE_ENVIRONMENT_NAME_REGEX = re.compile(r'^[A-Za-z0-9_]{1,128}$') + + +class ComputeEnvironment(BaseModel): + def __init__(self, compute_environment_name, _type, state, compute_resources, service_role, region_name): + self.compute_environment_name = compute_environment_name + self.type = _type + self.state = state + self.compute_resources = compute_resources + self.service_role = service_role + self.arn = make_arn_for_compute_env(DEFAULT_ACCOUNT_ID, compute_environment_name, region_name) class BatchBackend(BaseBackend): @@ -8,16 +30,125 @@ class BatchBackend(BaseBackend): super(BatchBackend, self).__init__() self.region_name = region_name + self._compute_environments = {} + + @property + def iam_backend(self): + """ + :return: IAM Backend + :rtype: moto.iam.models.IAMBackend + """ + return iam_backends['global'] + + @property + def ec2_backend(self): + """ + :return: EC2 Backend + :rtype: moto.ec2.models.EC2Backend + """ + return ec2_backends[self.region_name] + def reset(self): region_name = self.region_name self.__dict__ = {} self.__init__(region_name) - def create_compute_environment(self, compute_environment_name, type, state, compute_resources, service_role): - # implement here - return compute_environment_name, compute_environment_arn - # add methods from here + def get_compute_environment(self, arn): + return self._compute_environments.get(arn) + + def get_compute_environment_by_name(self, name): + for comp_env in self._compute_environments.values(): + if comp_env.name == name: + return comp_env + return None + + def create_compute_environment(self, compute_environment_name, _type, state, compute_resources, service_role): + # Validate + if COMPUTE_ENVIRONMENT_NAME_REGEX.match(compute_environment_name) is None: + raise InvalidParameterValueException('Compute environment name does not match ^[A-Za-z0-9_]{1,128}$') + + if self.get_compute_environment_by_name(compute_environment_name) is not None: + raise InvalidParameterValueException('A compute environment already exists with the name {0}'.format(compute_environment_name)) + + # Look for IAM role + try: + self.iam_backend.get_role_by_arn(service_role) + except IAMNotFoundException: + raise InvalidParameterValueException('Could not find IAM role {0}'.format(service_role)) + + if _type not in ('MANAGED', 'UNMANAGED'): + raise InvalidParameterValueException('type {0} must be one of MANAGED | UNMANAGED'.format(service_role)) + + if state is not None and state not in ('ENABLED', 'DISABLED'): + raise InvalidParameterValueException('state {0} must be one of ENABLED | DISABLED'.format(state)) + + if compute_resources is None and _type == 'MANAGED': + raise InvalidParameterValueException('computeResources must be specified when creating a MANAGED environment'.format(state)) + elif compute_resources is not None: + self._validate_compute_resources(compute_resources) + + # By here, all values except SPOT ones have been validated + new_comp_env = ComputeEnvironment( + compute_environment_name, _type, state, + compute_resources, service_role, + region_name=self.region_name + ) + self._compute_environments[new_comp_env.arn] = new_comp_env + + # TODO scale out if MANAGED and we have compute instance types + + return compute_environment_name, new_comp_env.arn + + def _validate_compute_resources(self, cr): + if 'instanceRole' not in cr: + raise InvalidParameterValueException('computeResources must contain instanceRole') + elif self.iam_backend.get_role_by_arn(cr['instanceRole']) is None: + raise InvalidParameterValueException('could not find instanceRole {0}'.format(cr['instanceRole'])) + + # TODO move the not in checks to a loop, or create a json schema validator class + if 'maxvCpus' not in cr: + raise InvalidParameterValueException('computeResources must contain maxVCpus') + if 'minvCpus' not in cr: + raise InvalidParameterValueException('computeResources must contain minVCpus') + if cr['maxvCpus'] < 0: + raise InvalidParameterValueException('maxVCpus must be positive') + if cr['minvCpus'] < 0: + raise InvalidParameterValueException('minVCpus must be positive') + if cr['maxvCpus'] < cr['minvCpus']: + raise InvalidParameterValueException('maxVCpus must be greater than minvCpus') + + # TODO check instance types when that logic exists + if 'instanceTypes' not in cr: + raise InvalidParameterValueException('computeResources must contain instanceTypes') + if len(cr['instanceTypes']) == 0: + raise InvalidParameterValueException('At least 1 instance type must be provided') + + if 'securityGroupIds' not in cr: + raise InvalidParameterValueException('computeResources must contain securityGroupIds') + for sec_id in cr['securityGroupIds']: + if self.ec2_backend.get_security_group_from_id(sec_id) is None: + raise InvalidParameterValueException('security group {0} does not exist'.format(sec_id)) + if len(cr['securityGroupIds']) == 0: + raise InvalidParameterValueException('At least 1 security group must be provided') + + if 'subnets' not in cr: + raise InvalidParameterValueException('computeResources must contain subnets') + for subnet_id in cr['subnets']: + try: + self.ec2_backend.get_subnet(subnet_id) + except InvalidSubnetIdError: + raise InvalidParameterValueException('subnet {0} does not exist'.format(subnet_id)) + if len(cr['subnets']) == 0: + raise InvalidParameterValueException('At least 1 subnet must be provided') + + if 'type' not in cr: + raise InvalidParameterValueException('computeResources must contain type') + if cr['type'] not in ('EC2', 'SPOT'): + raise InvalidParameterValueException('computeResources.type must be either EC2 | SPOT') + + if cr['type'] == 'SPOT': + raise InternalFailure('SPOT NOT SUPPORTED YET') available_regions = boto3.session.Session().get_available_regions("batch") -batch_backends = {region: BatchBackend for region in available_regions} +batch_backends = {region: BatchBackend(region_name=region) for region in available_regions} diff --git a/moto/batch/responses.py b/moto/batch/responses.py index d91af8a77..0368906f0 100644 --- a/moto/batch/responses.py +++ b/moto/batch/responses.py @@ -1,14 +1,58 @@ from __future__ import unicode_literals from moto.core.responses import BaseResponse from .models import batch_backends +from six.moves.urllib.parse import urlsplit + +from .exceptions import AWSError + +import json class BatchResponse(BaseResponse): + def _error(self, code, message): + return json.dumps({'__type': code, 'message': message}), dict(status=400) + @property def batch_backend(self): return batch_backends[self.region] - # add methods from here + @property + def json(self): + if not hasattr(self, '_json'): + self._json = json.loads(self.body) + return self._json + def _get_param(self, param_name, if_none=None): + val = self.json.get(param_name) + if val is not None: + return val + return if_none -# add teampltes from here + def _get_action(self): + # Return element after the /v1/* + return urlsplit(self.uri).path.lstrip('/').split('/')[1] + + # CreateComputeEnvironment + def createcomputeenvironment(self): + compute_env_name = self._get_param('computeEnvironmentName') + compute_resource = self._get_param('computeResources') + service_role = self._get_param('serviceRole') + state = self._get_param('state') + _type = self._get_param('type') + + try: + name, arn = self.batch_backend.create_compute_environment( + compute_environment_name=compute_env_name, + _type=_type, state=state, + compute_resources=compute_resource, + service_role=service_role + ) + except AWSError as err: + return err.response() + + result = { + 'computeEnvironmentArn': arn, + 'computeEnvironmentName': name + } + + return json.dumps(result) diff --git a/moto/batch/urls.py b/moto/batch/urls.py index 27cd9fc51..93f8a2f23 100644 --- a/moto/batch/urls.py +++ b/moto/batch/urls.py @@ -6,5 +6,5 @@ url_bases = [ ] url_paths = { - '{0}/$': BatchResponse.dispatch, + '{0}/v1/createcomputeenvironment': BatchResponse.dispatch, } diff --git a/moto/batch/utils.py b/moto/batch/utils.py index 33e474d61..d323a9bf7 100644 --- a/moto/batch/utils.py +++ b/moto/batch/utils.py @@ -1,6 +1,5 @@ from __future__ import unicode_literals -import uuid -def make_arn_for_topic(account_id, name, region_name): - return "arn:aws:sns:{0}:{1}:{2}".format(region_name, account_id, name) +def make_arn_for_compute_env(account_id, name, region_name): + return "arn:aws:batch:{0}:{1}:compute-environment/{2}".format(region_name, account_id, name) diff --git a/moto/iam/models.py b/moto/iam/models.py index a7e584284..34efb1a22 100644 --- a/moto/iam/models.py +++ b/moto/iam/models.py @@ -534,6 +534,12 @@ class IAMBackend(BaseBackend): return role raise IAMNotFoundException("Role {0} not found".format(role_name)) + def get_role_by_arn(self, arn): + for role in self.get_roles(): + if role.arn == arn: + return role + raise IAMNotFoundException("Role {0} not found".format(arn)) + def delete_role(self, role_name): for role in self.get_roles(): if role.name == role_name: diff --git a/tests/test_batch/test_batch.py b/tests/test_batch/test_batch.py index eafd32eae..3aae48e1e 100644 --- a/tests/test_batch/test_batch.py +++ b/tests/test_batch/test_batch.py @@ -2,10 +2,87 @@ from __future__ import unicode_literals import boto3 import sure # noqa -from moto import mock_batch +from moto import mock_batch, mock_iam, mock_ec2 +DEFAULT_REGION = 'eu-central-1' + + +def _get_clients(): + return boto3.client('ec2', region_name=DEFAULT_REGION), \ + boto3.client('iam', region_name=DEFAULT_REGION), \ + boto3.client('batch', region_name=DEFAULT_REGION) + + +def _setup(ec2_client, iam_client): + """ + Do prerequisite setup + :return: VPC ID, Subnet ID, Security group ID, IAM Role ARN + :rtype: tuple + """ + resp = ec2_client.create_vpc(CidrBlock='172.30.0.0/24') + vpc_id = resp['Vpc']['VpcId'] + resp = ec2_client.create_subnet( + AvailabilityZone='eu-central-1a', + CidrBlock='172.30.0.0/25', + VpcId=vpc_id + ) + subnet_id = resp['Subnet']['SubnetId'] + resp = ec2_client.create_security_group( + Description='test_sg_desc', + GroupName='test_sg', + VpcId=vpc_id + ) + sg_id = resp['GroupId'] + + resp = iam_client.create_role( + RoleName='TestRole', + AssumeRolePolicyDocument='some_policy' + ) + iam_arn = resp['Role']['Arn'] + + return vpc_id, subnet_id, sg_id, iam_arn + + +# Yes, yes it talks to all the things +@mock_ec2 +@mock_iam @mock_batch -def test_list(): - # do test - pass \ No newline at end of file +def test_create_compute_environment(): + ec2_client, iam_client, batch_client = _get_clients() + vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) + + compute_name = 'test_compute_env' + resp = batch_client.create_compute_environment( + computeEnvironmentName=compute_name, + type='MANAGED', + state='ENABLED', + computeResources={ + 'type': 'EC2', + 'minvCpus': 123, + 'maxvCpus': 123, + 'desiredvCpus': 123, + 'instanceTypes': [ + 'some_instance_type', + ], + 'imageId': 'some_image_id', + 'subnets': [ + subnet_id, + ], + 'securityGroupIds': [ + sg_id, + ], + 'ec2KeyPair': 'string', + 'instanceRole': iam_arn, + 'tags': { + 'string': 'string' + }, + 'bidPercentage': 123, + 'spotIamFleetRole': 'string' + }, + serviceRole=iam_arn + ) + resp.should.contain('computeEnvironmentArn') + resp['computeEnvironmentName'].should.equal(compute_name) + +# TODO create 1000s of tests to test complex option combinations of create environment From f95d72c37c69bf1a4b042d36bd623973d7970218 Mon Sep 17 00:00:00 2001 From: Terry Cain Date: Fri, 29 Sep 2017 23:29:36 +0100 Subject: [PATCH 04/22] Finialised create compute environment + describe environments --- moto/batch/models.py | 160 +++++++++++++++++++++++++++++---- moto/batch/responses.py | 19 +++- moto/batch/urls.py | 1 + tests/test_batch/test_batch.py | 84 +++++++++++++++-- 4 files changed, 237 insertions(+), 27 deletions(-) diff --git a/moto/batch/models.py b/moto/batch/models.py index c7def48d1..7ed75e749 100644 --- a/moto/batch/models.py +++ b/moto/batch/models.py @@ -1,13 +1,18 @@ from __future__ import unicode_literals import boto3 import re +from itertools import cycle +import six +import uuid from moto.core import BaseBackend, BaseModel from moto.iam import iam_backends from moto.ec2 import ec2_backends +from moto.ecs import ecs_backends from .exceptions import InvalidParameterValueException, InternalFailure from .utils import make_arn_for_compute_env from moto.ec2.exceptions import InvalidSubnetIdError +from moto.ec2.models import INSTANCE_TYPES as EC2_INSTANCE_TYPES from moto.iam.exceptions import IAMNotFoundException @@ -17,13 +22,22 @@ COMPUTE_ENVIRONMENT_NAME_REGEX = re.compile(r'^[A-Za-z0-9_]{1,128}$') class ComputeEnvironment(BaseModel): def __init__(self, compute_environment_name, _type, state, compute_resources, service_role, region_name): - self.compute_environment_name = compute_environment_name + self.name = compute_environment_name self.type = _type self.state = state self.compute_resources = compute_resources self.service_role = service_role self.arn = make_arn_for_compute_env(DEFAULT_ACCOUNT_ID, compute_environment_name, region_name) + self.instances = [] + self.ecs_arn = None + + def add_instance(self, instance): + self.instances.append(instance) + + def set_ecs_arn(self, arn): + self.ecs_arn = arn + class BatchBackend(BaseBackend): def __init__(self, region_name=None): @@ -48,6 +62,14 @@ class BatchBackend(BaseBackend): """ return ec2_backends[self.region_name] + @property + def ecs_backend(self): + """ + :return: ECS Backend + :rtype: moto.ecs.models.EC2ContainerServiceBackend + """ + return ecs_backends[self.region_name] + def reset(self): region_name = self.region_name self.__dict__ = {} @@ -62,6 +84,33 @@ class BatchBackend(BaseBackend): return comp_env return None + def describe_compute_environments(self, environments=None, max_results=None, next_token=None): + envs = set() + if environments is not None: + envs = set(environments) + + result = [] + for arn, environment in self._compute_environments.items(): + # Filter shortcut + if len(envs) > 0 and arn not in envs and environment.name not in envs: + continue + + json_part = { + 'computeEnvironmentArn': arn, + 'computeEnvironmentName': environment.name, + 'ecsClusterArn': environment.ecs_arn, + 'serviceRole': environment.service_role, + 'state': environment.state, + 'type': environment.type, + 'status': 'VALID' + } + if environment.type == 'MANAGED': + json_part['computeResources'] = environment.compute_resources + + result.append(json_part) + + return result + def create_compute_environment(self, compute_environment_name, _type, state, compute_resources, service_role): # Validate if COMPUTE_ENVIRONMENT_NAME_REGEX.match(compute_environment_name) is None: @@ -95,21 +144,53 @@ class BatchBackend(BaseBackend): ) self._compute_environments[new_comp_env.arn] = new_comp_env - # TODO scale out if MANAGED and we have compute instance types + # Ok by this point, everything is legit, so if its Managed then start some instances + if _type == 'MANAGED': + cpus = int(compute_resources.get('desiredvCpus', compute_resources['minvCpus'])) + instance_types = compute_resources['instanceTypes'] + needed_instance_types = self.find_min_instances_to_meet_vcpus(instance_types, cpus) + # Create instances + + # Will loop over and over so we get decent subnet coverage + subnet_cycle = cycle(compute_resources['subnets']) + + for instance_type in needed_instance_types: + reservation = self.ec2_backend.add_instances( + image_id='ami-ecs-optimised', # Todo import AMIs + count=1, + user_data=None, + security_group_names=[], + instance_type=instance_type, + region_name=self.region_name, + subnet_id=six.next(subnet_cycle), + key_name=compute_resources.get('ec2KeyPair', 'AWS_OWNED'), + security_group_ids=compute_resources['securityGroupIds'] + ) + + new_comp_env.add_instance(reservation.instances[0]) + + # Create ECS cluster + # Should be of format P2OnDemand_Batch_UUID + cluster_name = 'OnDemand_Batch_' + str(uuid.uuid4()) + ecs_cluster = self.ecs_backend.create_cluster(cluster_name) + new_comp_env.set_ecs_arn(ecs_cluster.arn) return compute_environment_name, new_comp_env.arn def _validate_compute_resources(self, cr): - if 'instanceRole' not in cr: - raise InvalidParameterValueException('computeResources must contain instanceRole') - elif self.iam_backend.get_role_by_arn(cr['instanceRole']) is None: + """ + Checks contents of sub dictionary for managed clusters + + :param cr: computeResources + :type cr: dict + """ + for param in ('instanceRole', 'maxvCpus', 'minvCpus', 'instanceTypes', 'securityGroupIds', 'subnets', 'type'): + if param not in cr: + raise InvalidParameterValueException('computeResources must contain {0}'.format(param)) + + if self.iam_backend.get_role_by_arn(cr['instanceRole']) is None: raise InvalidParameterValueException('could not find instanceRole {0}'.format(cr['instanceRole'])) - # TODO move the not in checks to a loop, or create a json schema validator class - if 'maxvCpus' not in cr: - raise InvalidParameterValueException('computeResources must contain maxVCpus') - if 'minvCpus' not in cr: - raise InvalidParameterValueException('computeResources must contain minVCpus') if cr['maxvCpus'] < 0: raise InvalidParameterValueException('maxVCpus must be positive') if cr['minvCpus'] < 0: @@ -117,22 +198,18 @@ class BatchBackend(BaseBackend): if cr['maxvCpus'] < cr['minvCpus']: raise InvalidParameterValueException('maxVCpus must be greater than minvCpus') - # TODO check instance types when that logic exists - if 'instanceTypes' not in cr: - raise InvalidParameterValueException('computeResources must contain instanceTypes') if len(cr['instanceTypes']) == 0: raise InvalidParameterValueException('At least 1 instance type must be provided') + for instance_type in cr['instanceTypes']: + if instance_type not in EC2_INSTANCE_TYPES: + raise InvalidParameterValueException('Instance type {0} does not exist'.format(instance_type)) - if 'securityGroupIds' not in cr: - raise InvalidParameterValueException('computeResources must contain securityGroupIds') for sec_id in cr['securityGroupIds']: if self.ec2_backend.get_security_group_from_id(sec_id) is None: raise InvalidParameterValueException('security group {0} does not exist'.format(sec_id)) if len(cr['securityGroupIds']) == 0: raise InvalidParameterValueException('At least 1 security group must be provided') - if 'subnets' not in cr: - raise InvalidParameterValueException('computeResources must contain subnets') for subnet_id in cr['subnets']: try: self.ec2_backend.get_subnet(subnet_id) @@ -141,14 +218,59 @@ class BatchBackend(BaseBackend): if len(cr['subnets']) == 0: raise InvalidParameterValueException('At least 1 subnet must be provided') - if 'type' not in cr: - raise InvalidParameterValueException('computeResources must contain type') if cr['type'] not in ('EC2', 'SPOT'): raise InvalidParameterValueException('computeResources.type must be either EC2 | SPOT') if cr['type'] == 'SPOT': raise InternalFailure('SPOT NOT SUPPORTED YET') + @staticmethod + def find_min_instances_to_meet_vcpus(instance_types, target): + """ + Finds the minimum needed instances to meed a vcpu target + + :param instance_types: Instance types, like ['t2.medium', 't2.small'] + :type instance_types: list of str + :param target: VCPU target + :type target: float + :return: List of instance types + :rtype: list of str + """ + # vcpus = [ (vcpus, instance_type), (vcpus, instance_type), ... ] + instance_vcpus = [] + instances = [] + + for instance_type in instance_types: + instance_vcpus.append( + (EC2_INSTANCE_TYPES[instance_type]['vcpus'], instance_type) + ) + + instance_vcpus = sorted(instance_vcpus, key=lambda item: item[0], reverse=True) + # Loop through, + # if biggest instance type smaller than target, and len(instance_types)> 1, then use biggest type + # if biggest instance type bigger than target, and len(instance_types)> 1, then remove it and move on + + # if biggest instance type bigger than target and len(instan_types) == 1 then add instance and finish + # if biggest instance type smaller than target and len(instan_types) == 1 then loop adding instances until target == 0 + # ^^ boils down to keep adding last till target vcpus is negative + # #Algorithm ;-) ... Could probably be done better with some quality lambdas + while target > 0: + current_vcpu, current_instance = instance_vcpus[0] + + if len(instance_vcpus) > 1: + if current_vcpu <= target: + target -= current_vcpu + instances.append(current_instance) + else: + # try next biggest instance + instance_vcpus.pop(0) + else: + # Were on the last instance + target -= current_vcpu + instances.append(current_instance) + + return instances + available_regions = boto3.session.Session().get_available_regions("batch") batch_backends = {region: BatchBackend(region_name=region) for region in available_regions} diff --git a/moto/batch/responses.py b/moto/batch/responses.py index 0368906f0..80aedcf70 100644 --- a/moto/batch/responses.py +++ b/moto/batch/responses.py @@ -14,11 +14,17 @@ class BatchResponse(BaseResponse): @property def batch_backend(self): + """ + :return: Batch Backend + :rtype: moto.batch.models.BatchBackend + """ return batch_backends[self.region] @property def json(self): - if not hasattr(self, '_json'): + if self.body is None: + self._json = {} + elif not hasattr(self, '_json'): self._json = json.loads(self.body) return self._json @@ -56,3 +62,14 @@ class BatchResponse(BaseResponse): } return json.dumps(result) + + # DescribeComputeEnvironments + def describecomputeenvironments(self): + compute_environments = self._get_param('computeEnvironments') + max_results = self._get_param('maxResults') # Ignored, should be int + next_token = self._get_param('nextToken') # Ignored + + envs = self.batch_backend.describe_compute_environments(compute_environments, max_results=max_results, next_token=next_token) + + result = {'computeEnvironments': envs} + return json.dumps(result) diff --git a/moto/batch/urls.py b/moto/batch/urls.py index 93f8a2f23..9ad3db06f 100644 --- a/moto/batch/urls.py +++ b/moto/batch/urls.py @@ -7,4 +7,5 @@ url_bases = [ url_paths = { '{0}/v1/createcomputeenvironment': BatchResponse.dispatch, + '{0}/v1/describecomputeenvironments': BatchResponse.dispatch, } diff --git a/tests/test_batch/test_batch.py b/tests/test_batch/test_batch.py index 3aae48e1e..aceb95804 100644 --- a/tests/test_batch/test_batch.py +++ b/tests/test_batch/test_batch.py @@ -2,7 +2,7 @@ from __future__ import unicode_literals import boto3 import sure # noqa -from moto import mock_batch, mock_iam, mock_ec2 +from moto import mock_batch, mock_iam, mock_ec2, mock_ecs DEFAULT_REGION = 'eu-central-1' @@ -11,6 +11,7 @@ DEFAULT_REGION = 'eu-central-1' def _get_clients(): return boto3.client('ec2', region_name=DEFAULT_REGION), \ boto3.client('iam', region_name=DEFAULT_REGION), \ + boto3.client('ecs', region_name=DEFAULT_REGION), \ boto3.client('batch', region_name=DEFAULT_REGION) @@ -46,10 +47,11 @@ def _setup(ec2_client, iam_client): # Yes, yes it talks to all the things @mock_ec2 +@mock_ecs @mock_iam @mock_batch -def test_create_compute_environment(): - ec2_client, iam_client, batch_client = _get_clients() +def test_create_managed_compute_environment(): + ec2_client, iam_client, ecs_client, batch_client = _get_clients() vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) compute_name = 'test_compute_env' @@ -59,11 +61,12 @@ def test_create_compute_environment(): state='ENABLED', computeResources={ 'type': 'EC2', - 'minvCpus': 123, - 'maxvCpus': 123, - 'desiredvCpus': 123, + 'minvCpus': 5, + 'maxvCpus': 10, + 'desiredvCpus': 5, 'instanceTypes': [ - 'some_instance_type', + 't2.small', + 't2.medium' ], 'imageId': 'some_image_id', 'subnets': [ @@ -85,4 +88,71 @@ def test_create_compute_environment(): resp.should.contain('computeEnvironmentArn') resp['computeEnvironmentName'].should.equal(compute_name) + # Given a t2.medium is 2 vcpu and t2.small is 1, therefore 2 mediums and 1 small should be created + resp = ec2_client.describe_instances() + resp.should.contain('Reservations') + len(resp['Reservations']).should.equal(3) + + # Should have created 1 ECS cluster + resp = ecs_client.list_clusters() + resp.should.contain('clusterArns') + len(resp['clusterArns']).should.equal(1) + + +@mock_ec2 +@mock_ecs +@mock_iam +@mock_batch +def test_create_unmanaged_compute_environment(): + ec2_client, iam_client, ecs_client, batch_client = _get_clients() + vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) + + compute_name = 'test_compute_env' + resp = batch_client.create_compute_environment( + computeEnvironmentName=compute_name, + type='UNMANAGED', + state='ENABLED', + serviceRole=iam_arn + ) + resp.should.contain('computeEnvironmentArn') + resp['computeEnvironmentName'].should.equal(compute_name) + + # Its unmanaged so no instances should be created + resp = ec2_client.describe_instances() + resp.should.contain('Reservations') + len(resp['Reservations']).should.equal(0) + + # Should have created 1 ECS cluster + resp = ecs_client.list_clusters() + resp.should.contain('clusterArns') + len(resp['clusterArns']).should.equal(1) + # TODO create 1000s of tests to test complex option combinations of create environment + + +@mock_ec2 +@mock_ecs +@mock_iam +@mock_batch +def test_describe_compute_environment(): + ec2_client, iam_client, ecs_client, batch_client = _get_clients() + vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) + + compute_name = 'test_compute_env' + batch_client.create_compute_environment( + computeEnvironmentName=compute_name, + type='UNMANAGED', + state='ENABLED', + serviceRole=iam_arn + ) + + resp = batch_client.describe_compute_environments() + len(resp['computeEnvironments']).should.equal(1) + resp['computeEnvironments'][0]['computeEnvironmentName'].should.equal(compute_name) + + # Test filtering + resp = batch_client.describe_compute_environments( + computeEnvironments=['test1'] + ) + len(resp['computeEnvironments']).should.equal(0) + From 9af88bf20636fe9009060b5dd0cec42f6c8736d6 Mon Sep 17 00:00:00 2001 From: Terry Cain Date: Fri, 29 Sep 2017 23:43:03 +0100 Subject: [PATCH 05/22] Fixed batch errors --- moto/batch/responses.py | 7 +++++-- moto/batch/urls.py | 4 ++-- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/moto/batch/responses.py b/moto/batch/responses.py index 80aedcf70..590cc27a4 100644 --- a/moto/batch/responses.py +++ b/moto/batch/responses.py @@ -22,10 +22,13 @@ class BatchResponse(BaseResponse): @property def json(self): - if self.body is None: + if self.body is None or self.body == '': self._json = {} elif not hasattr(self, '_json'): - self._json = json.loads(self.body) + try: + self._json = json.loads(self.body) + except json.JSONDecodeError: + print() return self._json def _get_param(self, param_name, if_none=None): diff --git a/moto/batch/urls.py b/moto/batch/urls.py index 9ad3db06f..18de99199 100644 --- a/moto/batch/urls.py +++ b/moto/batch/urls.py @@ -6,6 +6,6 @@ url_bases = [ ] url_paths = { - '{0}/v1/createcomputeenvironment': BatchResponse.dispatch, - '{0}/v1/describecomputeenvironments': BatchResponse.dispatch, + '{0}/v1/createcomputeenvironment$': BatchResponse.dispatch, + '{0}/v1/describecomputeenvironments$': BatchResponse.dispatch, } From 88a11b21aed178d849ff20523350099c83be5e3b Mon Sep 17 00:00:00 2001 From: Terry Cain Date: Tue, 3 Oct 2017 22:35:30 +0100 Subject: [PATCH 06/22] Added DeleteComputeEnvironment and UpdateComputeEnvironment --- moto/batch/exceptions.py | 5 ++ moto/batch/models.py | 70 +++++++++++++++++++-- moto/batch/responses.py | 34 ++++++++++ moto/batch/urls.py | 2 + tests/test_batch/test_batch.py | 109 +++++++++++++++++++++++++++++++++ 5 files changed, 216 insertions(+), 4 deletions(-) diff --git a/moto/batch/exceptions.py b/moto/batch/exceptions.py index cd6031a95..a71e54ce3 100644 --- a/moto/batch/exceptions.py +++ b/moto/batch/exceptions.py @@ -30,3 +30,8 @@ class ValidationError(AWSError): class InternalFailure(AWSError): CODE = 'InternalFailure' STATUS = 500 + + +class ClientException(AWSError): + CODE = 'ClientException' + STATUS = 400 diff --git a/moto/batch/models.py b/moto/batch/models.py index 7ed75e749..8572a46c0 100644 --- a/moto/batch/models.py +++ b/moto/batch/models.py @@ -9,7 +9,7 @@ from moto.iam import iam_backends from moto.ec2 import ec2_backends from moto.ecs import ecs_backends -from .exceptions import InvalidParameterValueException, InternalFailure +from .exceptions import InvalidParameterValueException, InternalFailure, ClientException from .utils import make_arn_for_compute_env from moto.ec2.exceptions import InvalidSubnetIdError from moto.ec2.models import INSTANCE_TYPES as EC2_INSTANCE_TYPES @@ -31,12 +31,14 @@ class ComputeEnvironment(BaseModel): self.instances = [] self.ecs_arn = None + self.ecs_name = None def add_instance(self, instance): self.instances.append(instance) - def set_ecs_arn(self, arn): + def set_ecs(self, arn, name): self.ecs_arn = arn + self.ecs_name = name class BatchBackend(BaseBackend): @@ -75,7 +77,7 @@ class BatchBackend(BaseBackend): self.__dict__ = {} self.__init__(region_name) - def get_compute_environment(self, arn): + def get_compute_environment_arn(self, arn): return self._compute_environments.get(arn) def get_compute_environment_by_name(self, name): @@ -84,6 +86,20 @@ class BatchBackend(BaseBackend): return comp_env return None + def get_compute_environment(self, identifier): + """ + Get compute environment by name or ARN + :param identifier: Name or ARN + :type identifier: str + + :return: Compute Environment or None + :rtype: ComputeEnvironment or None + """ + env = self.get_compute_environment_arn(identifier) + if env is None: + env = self.get_compute_environment_by_name(identifier) + return env + def describe_compute_environments(self, environments=None, max_results=None, next_token=None): envs = set() if environments is not None: @@ -173,7 +189,7 @@ class BatchBackend(BaseBackend): # Should be of format P2OnDemand_Batch_UUID cluster_name = 'OnDemand_Batch_' + str(uuid.uuid4()) ecs_cluster = self.ecs_backend.create_cluster(cluster_name) - new_comp_env.set_ecs_arn(ecs_cluster.arn) + new_comp_env.set_ecs(ecs_cluster.arn, cluster_name) return compute_environment_name, new_comp_env.arn @@ -271,6 +287,52 @@ class BatchBackend(BaseBackend): return instances + def delete_compute_environment(self, compute_environment_name): + if compute_environment_name is None: + raise InvalidParameterValueException('Missing computeEnvironment parameter') + + compute_env = self.get_compute_environment(compute_environment_name) + + if compute_env is not None: + # Pop ComputeEnvironment + self._compute_environments.pop(compute_env.arn) + + # Delete ECS cluster + self.ecs_backend.delete_cluster(compute_env.ecs_name) + + if compute_env.type == 'MANAGED': + # Delete compute envrionment + instance_ids = [instance.id for instance in compute_env.instances] + self.ec2_backend.terminate_instances(instance_ids) + + def update_compute_environment(self, compute_environment_name, state, compute_resources, service_role): + # Validate + compute_env = self.get_compute_environment(compute_environment_name) + if compute_env is None: + raise ClientException('Compute environment {0} does not exist') + + # Look for IAM role + if service_role is not None: + try: + role = self.iam_backend.get_role_by_arn(service_role) + except IAMNotFoundException: + raise InvalidParameterValueException('Could not find IAM role {0}'.format(service_role)) + + compute_env.service_role = role + + if state is not None: + if state not in ('ENABLED', 'DISABLED'): + raise InvalidParameterValueException('state {0} must be one of ENABLED | DISABLED'.format(state)) + + compute_env.state = state + + if compute_resources is not None: + # TODO Implement resizing of instances based on changing vCpus + # compute_resources CAN contain desiredvCpus, maxvCpus, minvCpus, and can contain none of them. + pass + + return compute_env.name, compute_env.arn + available_regions = boto3.session.Session().get_available_regions("batch") batch_backends = {region: BatchBackend(region_name=region) for region in available_regions} diff --git a/moto/batch/responses.py b/moto/batch/responses.py index 590cc27a4..86ee4fdfe 100644 --- a/moto/batch/responses.py +++ b/moto/batch/responses.py @@ -76,3 +76,37 @@ class BatchResponse(BaseResponse): result = {'computeEnvironments': envs} return json.dumps(result) + + # DeleteComputeEnvironment + def deletecomputeenvironment(self): + compute_environment = self._get_param('computeEnvironment') + + try: + self.batch_backend.delete_compute_environment(compute_environment) + except AWSError as err: + return err.response() + + return '' + + def updatecomputeenvironment(self): + compute_env_name = self._get_param('computeEnvironment') + compute_resource = self._get_param('computeResources') + service_role = self._get_param('serviceRole') + state = self._get_param('state') + + try: + name, arn = self.batch_backend.update_compute_environment( + compute_environment_name=compute_env_name, + compute_resources=compute_resource, + service_role=service_role, + state=state + ) + except AWSError as err: + return err.response() + + result = { + 'computeEnvironmentArn': arn, + 'computeEnvironmentName': name + } + + return json.dumps(result) diff --git a/moto/batch/urls.py b/moto/batch/urls.py index 18de99199..ef8f7927a 100644 --- a/moto/batch/urls.py +++ b/moto/batch/urls.py @@ -8,4 +8,6 @@ url_bases = [ url_paths = { '{0}/v1/createcomputeenvironment$': BatchResponse.dispatch, '{0}/v1/describecomputeenvironments$': BatchResponse.dispatch, + '{0}/v1/deletecomputeenvironment': BatchResponse.dispatch, + '{0}/v1/updatecomputeenvironment': BatchResponse.dispatch, } diff --git a/tests/test_batch/test_batch.py b/tests/test_batch/test_batch.py index aceb95804..159f255c3 100644 --- a/tests/test_batch/test_batch.py +++ b/tests/test_batch/test_batch.py @@ -156,3 +156,112 @@ def test_describe_compute_environment(): ) len(resp['computeEnvironments']).should.equal(0) + +@mock_ec2 +@mock_ecs +@mock_iam +@mock_batch +def test_delete_unmanaged_compute_environment(): + ec2_client, iam_client, ecs_client, batch_client = _get_clients() + vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) + + compute_name = 'test_compute_env' + batch_client.create_compute_environment( + computeEnvironmentName=compute_name, + type='UNMANAGED', + state='ENABLED', + serviceRole=iam_arn + ) + + batch_client.delete_compute_environment( + computeEnvironment=compute_name, + ) + + resp = batch_client.describe_compute_environments() + len(resp['computeEnvironments']).should.equal(0) + + resp = ecs_client.list_clusters() + len(resp.get('clusterArns', [])).should.equal(0) + + +@mock_ec2 +@mock_ecs +@mock_iam +@mock_batch +def test_delete_managed_compute_environment(): + ec2_client, iam_client, ecs_client, batch_client = _get_clients() + vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) + + compute_name = 'test_compute_env' + batch_client.create_compute_environment( + computeEnvironmentName=compute_name, + type='MANAGED', + state='ENABLED', + computeResources={ + 'type': 'EC2', + 'minvCpus': 5, + 'maxvCpus': 10, + 'desiredvCpus': 5, + 'instanceTypes': [ + 't2.small', + 't2.medium' + ], + 'imageId': 'some_image_id', + 'subnets': [ + subnet_id, + ], + 'securityGroupIds': [ + sg_id, + ], + 'ec2KeyPair': 'string', + 'instanceRole': iam_arn, + 'tags': { + 'string': 'string' + }, + 'bidPercentage': 123, + 'spotIamFleetRole': 'string' + }, + serviceRole=iam_arn + ) + + batch_client.delete_compute_environment( + computeEnvironment=compute_name, + ) + + resp = batch_client.describe_compute_environments() + len(resp['computeEnvironments']).should.equal(0) + + resp = ec2_client.describe_instances() + resp.should.contain('Reservations') + len(resp['Reservations']).should.equal(3) + for reservation in resp['Reservations']: + reservation['Instances'][0]['State']['Name'].should.equal('terminated') + + resp = ecs_client.list_clusters() + len(resp.get('clusterArns', [])).should.equal(0) + + +@mock_ec2 +@mock_ecs +@mock_iam +@mock_batch +def test_update_unmanaged_compute_environment_state(): + ec2_client, iam_client, ecs_client, batch_client = _get_clients() + vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) + + compute_name = 'test_compute_env' + batch_client.create_compute_environment( + computeEnvironmentName=compute_name, + type='UNMANAGED', + state='ENABLED', + serviceRole=iam_arn + ) + + batch_client.update_compute_environment( + computeEnvironment=compute_name, + state='DISABLED' + ) + + resp = batch_client.describe_compute_environments() + len(resp['computeEnvironments']).should.equal(1) + resp['computeEnvironments'][0]['state'].should.equal('DISABLED') From 15218df12fafd451ba03317696f7a71323a1e0dc Mon Sep 17 00:00:00 2001 From: Terry Cain Date: Tue, 3 Oct 2017 23:21:06 +0100 Subject: [PATCH 07/22] Added CreateJobQueue and DescribeJobQueue --- moto/batch/models.py | 127 ++++++++++++++++++++++++++++++++- moto/batch/responses.py | 36 ++++++++++ moto/batch/urls.py | 2 + moto/batch/utils.py | 4 ++ tests/test_batch/test_batch.py | 71 ++++++++++++++++++ 5 files changed, 237 insertions(+), 3 deletions(-) diff --git a/moto/batch/models.py b/moto/batch/models.py index 8572a46c0..e336a60d7 100644 --- a/moto/batch/models.py +++ b/moto/batch/models.py @@ -10,7 +10,7 @@ from moto.ec2 import ec2_backends from moto.ecs import ecs_backends from .exceptions import InvalidParameterValueException, InternalFailure, ClientException -from .utils import make_arn_for_compute_env +from .utils import make_arn_for_compute_env, make_arn_for_job_queue from moto.ec2.exceptions import InvalidSubnetIdError from moto.ec2.models import INSTANCE_TYPES as EC2_INSTANCE_TYPES from moto.iam.exceptions import IAMNotFoundException @@ -41,12 +41,50 @@ class ComputeEnvironment(BaseModel): self.ecs_name = name +class JobQueue(BaseModel): + def __init__(self, name, priority, state, environments, env_order_json, region_name): + """ + :param name: Job queue name + :type name: str + :param priority: Job queue priority + :type priority: int + :param state: Either ENABLED or DISABLED + :type state: str + :param environments: Compute Environments + :type environments: list of ComputeEnvironment + :param env_order_json: Compute Environments JSON for use when describing + :type env_order_json: list of dict + :param region_name: Region name + :type region_name: str + """ + self.name = name + self.priority = priority + self.state = state + self.environments = environments + self.env_order_json = env_order_json + self.arn = make_arn_for_job_queue(DEFAULT_ACCOUNT_ID, name, region_name) + self.status = 'VALID' + + def describe(self): + result = { + 'computeEnvironmentOrder': self.env_order_json, + 'jobQueueArn': self.arn, + 'jobQueueName': self.name, + 'priority': self.priority, + 'state': self.state, + 'status': self.status + } + + return result + + class BatchBackend(BaseBackend): def __init__(self, region_name=None): super(BatchBackend, self).__init__() self.region_name = region_name self._compute_environments = {} + self._job_queues = {} @property def iam_backend(self): @@ -77,7 +115,7 @@ class BatchBackend(BaseBackend): self.__dict__ = {} self.__init__(region_name) - def get_compute_environment_arn(self, arn): + def get_compute_environment_by_arn(self, arn): return self._compute_environments.get(arn) def get_compute_environment_by_name(self, name): @@ -95,11 +133,34 @@ class BatchBackend(BaseBackend): :return: Compute Environment or None :rtype: ComputeEnvironment or None """ - env = self.get_compute_environment_arn(identifier) + env = self.get_compute_environment_by_arn(identifier) if env is None: env = self.get_compute_environment_by_name(identifier) return env + def get_job_queue_by_arn(self, arn): + return self._job_queues.get(arn) + + def get_job_queue_by_name(self, name): + for comp_env in self._job_queues.values(): + if comp_env.name == name: + return comp_env + return None + + def get_job_queue(self, identifier): + """ + Get job queue by name or ARN + :param identifier: Name or ARN + :type identifier: str + + :return: Job Queue or None + :rtype: JobQueue or None + """ + env = self.get_job_queue_by_arn(identifier) + if env is None: + env = self.get_job_queue_by_name(identifier) + return env + def describe_compute_environments(self, environments=None, max_results=None, next_token=None): envs = set() if environments is not None: @@ -333,6 +394,66 @@ class BatchBackend(BaseBackend): return compute_env.name, compute_env.arn + def create_job_queue(self, queue_name, priority, state, compute_env_order): + """ + Create a job queue + + :param queue_name: Queue name + :type queue_name: str + :param priority: Queue priority + :type priority: int + :param state: Queue state + :type state: string + :param compute_env_order: Compute environment list + :type compute_env_order: list of dict + :return: Tuple of Name, ARN + :rtype: tuple of str + """ + for variable, var_name in ((queue_name, 'jobQueueName'), (priority, 'priority'), (state, 'state'), (compute_env_order, 'computeEnvironmentOrder')): + if variable is None: + raise ClientException('{0} must be provided'.format(var_name)) + + if state not in ('ENABLED', 'DISABLED'): + raise ClientException('state {0} must be one of ENABLED | DISABLED'.format(state)) + if self.get_job_queue_by_name(queue_name) is not None: + raise ClientException('Job queue {0} already exists'.format(queue_name)) + + if len(compute_env_order) == 0: + raise ClientException('At least 1 compute environment must be provided') + try: + # orders and extracts computeEnvironment names + ordered_compute_environments = [item['computeEnvironment'] for item in sorted(compute_env_order, key=lambda x: x['order'])] + env_objects = [] + # Check each ARN exists, then make a list of compute env's + for arn in ordered_compute_environments: + env = self.get_compute_environment_by_arn(arn) + if env is None: + raise ClientException('Compute environment {0} does not exist'.format(arn)) + env_objects.append(env) + except Exception: + raise ClientException('computeEnvironmentOrder is malformed') + + # Create new Job Queue + queue = JobQueue(queue_name, priority, state, env_objects, compute_env_order, self.region_name) + self._job_queues[queue.arn] = queue + + return queue_name, queue.arn + + def describe_job_queues(self, job_queues=None, max_results=None, next_token=None): + envs = set() + if job_queues is not None: + envs = set(job_queues) + + result = [] + for arn, job_queue in self._job_queues.items(): + # Filter shortcut + if len(envs) > 0 and arn not in envs and job_queue.name not in envs: + continue + + result.append(job_queue.describe()) + + return result + available_regions = boto3.session.Session().get_available_regions("batch") batch_backends = {region: BatchBackend(region_name=region) for region in available_regions} diff --git a/moto/batch/responses.py b/moto/batch/responses.py index 86ee4fdfe..661b9c7c2 100644 --- a/moto/batch/responses.py +++ b/moto/batch/responses.py @@ -88,6 +88,7 @@ class BatchResponse(BaseResponse): return '' + # UpdateComputeEnvironment def updatecomputeenvironment(self): compute_env_name = self._get_param('computeEnvironment') compute_resource = self._get_param('computeResources') @@ -110,3 +111,38 @@ class BatchResponse(BaseResponse): } return json.dumps(result) + + # CreateJobQueue + def createjobqueue(self): + compute_env_order = self._get_param('computeEnvironmentOrder') + queue_name = self._get_param('jobQueueName') + priority = self._get_param('priority') + state = self._get_param('state') + + try: + name, arn = self.batch_backend.create_job_queue( + queue_name=queue_name, + priority=priority, + state=state, + compute_env_order=compute_env_order + ) + except AWSError as err: + return err.response() + + result = { + 'jobQueueArn': arn, + 'jobQueueName': name + } + + return json.dumps(result) + + # DescribeJobQueues + def describejobqueues(self): + job_queues = self._get_param('jobQueues') + max_results = self._get_param('maxResults') # Ignored, should be int + next_token = self._get_param('nextToken') # Ignored + + queues = self.batch_backend.describe_job_queues(job_queues, max_results=max_results, next_token=next_token) + + result = {'jobQueues': queues} + return json.dumps(result) diff --git a/moto/batch/urls.py b/moto/batch/urls.py index ef8f7927a..227e78ecf 100644 --- a/moto/batch/urls.py +++ b/moto/batch/urls.py @@ -10,4 +10,6 @@ url_paths = { '{0}/v1/describecomputeenvironments$': BatchResponse.dispatch, '{0}/v1/deletecomputeenvironment': BatchResponse.dispatch, '{0}/v1/updatecomputeenvironment': BatchResponse.dispatch, + '{0}/v1/createjobqueue': BatchResponse.dispatch, + '{0}/v1/describejobqueues': BatchResponse.dispatch, } diff --git a/moto/batch/utils.py b/moto/batch/utils.py index d323a9bf7..68c6a3581 100644 --- a/moto/batch/utils.py +++ b/moto/batch/utils.py @@ -3,3 +3,7 @@ from __future__ import unicode_literals def make_arn_for_compute_env(account_id, name, region_name): return "arn:aws:batch:{0}:{1}:compute-environment/{2}".format(region_name, account_id, name) + + +def make_arn_for_job_queue(account_id, name, region_name): + return "arn:aws:batch:{0}:{1}:job-queue/{2}".format(region_name, account_id, name) diff --git a/tests/test_batch/test_batch.py b/tests/test_batch/test_batch.py index 159f255c3..6bf68a6fc 100644 --- a/tests/test_batch/test_batch.py +++ b/tests/test_batch/test_batch.py @@ -1,6 +1,7 @@ from __future__ import unicode_literals import boto3 +from botocore.exceptions import ClientError import sure # noqa from moto import mock_batch, mock_iam, mock_ec2, mock_ecs @@ -265,3 +266,73 @@ def test_update_unmanaged_compute_environment_state(): resp = batch_client.describe_compute_environments() len(resp['computeEnvironments']).should.equal(1) resp['computeEnvironments'][0]['state'].should.equal('DISABLED') + + +@mock_ec2 +@mock_ecs +@mock_iam +@mock_batch +def test_create_job_queue(): + ec2_client, iam_client, ecs_client, batch_client = _get_clients() + vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) + + compute_name = 'test_compute_env' + resp = batch_client.create_compute_environment( + computeEnvironmentName=compute_name, + type='UNMANAGED', + state='ENABLED', + serviceRole=iam_arn + ) + arn = resp['computeEnvironmentArn'] + + resp = batch_client.create_job_queue( + jobQueueName='test_job_queue', + state='ENABLED', + priority=123, + computeEnvironmentOrder=[ + { + 'order': 123, + 'computeEnvironment': arn + }, + ] + ) + resp.should.contain('jobQueueArn') + resp.should.contain('jobQueueName') + queue_arn = resp['jobQueueArn'] + + resp = batch_client.describe_job_queues() + resp.should.contain('jobQueues') + resp['jobQueues'][0]['jobQueueArn'].should.equal(queue_arn) + + +@mock_ec2 +@mock_ecs +@mock_iam +@mock_batch +def test_job_queue_bad_arn(): + ec2_client, iam_client, ecs_client, batch_client = _get_clients() + vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) + + compute_name = 'test_compute_env' + resp = batch_client.create_compute_environment( + computeEnvironmentName=compute_name, + type='UNMANAGED', + state='ENABLED', + serviceRole=iam_arn + ) + arn = resp['computeEnvironmentArn'] + + try: + batch_client.create_job_queue( + jobQueueName='test_job_queue', + state='ENABLED', + priority=123, + computeEnvironmentOrder=[ + { + 'order': 123, + 'computeEnvironment': arn + 'LALALA' + }, + ] + ) + except ClientError as err: + err.response['Error']['Code'].should.equal('ClientException') From b8f24298fda5458b1de889c52591c1bd287e4e5b Mon Sep 17 00:00:00 2001 From: Terry Cain Date: Tue, 3 Oct 2017 23:28:10 +0100 Subject: [PATCH 08/22] Added filtering test part --- tests/test_batch/test_batch.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tests/test_batch/test_batch.py b/tests/test_batch/test_batch.py index 6bf68a6fc..b082d656e 100644 --- a/tests/test_batch/test_batch.py +++ b/tests/test_batch/test_batch.py @@ -302,8 +302,13 @@ def test_create_job_queue(): resp = batch_client.describe_job_queues() resp.should.contain('jobQueues') + len(resp['jobQueues']).should.equal(1) resp['jobQueues'][0]['jobQueueArn'].should.equal(queue_arn) + resp = batch_client.describe_job_queues(jobQueues=['test_invalid_queue']) + resp.should.contain('jobQueues') + len(resp['jobQueues']).should.equal(0) + @mock_ec2 @mock_ecs From 8cb381f7252bc9aeff3a5ac52fb35ee2d4e166e5 Mon Sep 17 00:00:00 2001 From: Terry Cain Date: Wed, 4 Oct 2017 00:20:39 +0100 Subject: [PATCH 09/22] Possible import order fix --- moto/__init__.py | 2 +- moto/backends.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/moto/__init__.py b/moto/__init__.py index f7a74211e..3a3d6f0ac 100644 --- a/moto/__init__.py +++ b/moto/__init__.py @@ -9,7 +9,6 @@ from .acm import mock_acm # flake8: noqa from .apigateway import mock_apigateway, mock_apigateway_deprecated # flake8: noqa from .autoscaling import mock_autoscaling, mock_autoscaling_deprecated # flake8: noqa from .awslambda import mock_lambda, mock_lambda_deprecated # flake8: noqa -from .batch import mock_batch # flake8: noqa from .cloudformation import mock_cloudformation, mock_cloudformation_deprecated # flake8: noqa from .cloudwatch import mock_cloudwatch, mock_cloudwatch_deprecated # flake8: noqa from .datapipeline import mock_datapipeline, mock_datapipeline_deprecated # flake8: noqa @@ -41,6 +40,7 @@ from .route53 import mock_route53, mock_route53_deprecated # flake8: noqa from .swf import mock_swf, mock_swf_deprecated # flake8: noqa from .xray import mock_xray # flake8: noqa from .logs import mock_logs, mock_logs_deprecated # flake8: noqa +from .batch import mock_batch # flake8: noqa try: diff --git a/moto/backends.py b/moto/backends.py index 1a401ca06..d1ce0730e 100644 --- a/moto/backends.py +++ b/moto/backends.py @@ -4,7 +4,6 @@ from moto.acm import acm_backends from moto.apigateway import apigateway_backends from moto.autoscaling import autoscaling_backends from moto.awslambda import lambda_backends -from moto.batch import batch_backends from moto.cloudformation import cloudformation_backends from moto.cloudwatch import cloudwatch_backends from moto.core import moto_api_backends @@ -36,6 +35,7 @@ from moto.sqs import sqs_backends from moto.ssm import ssm_backends from moto.sts import sts_backends from moto.xray import xray_backends +from moto.batch import batch_backends BACKENDS = { 'acm': acm_backends, From 8441e44e802426cbd9aa82852708a01b6b2b7fe2 Mon Sep 17 00:00:00 2001 From: Terry Cain Date: Wed, 4 Oct 2017 01:09:28 +0100 Subject: [PATCH 10/22] Possible fix V2 --- moto/elbv2/urls.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/moto/elbv2/urls.py b/moto/elbv2/urls.py index 13a8e056f..b7d8adb58 100644 --- a/moto/elbv2/urls.py +++ b/moto/elbv2/urls.py @@ -1,10 +1,11 @@ from __future__ import unicode_literals from .responses import ELBV2Response +from ..elb.urls import api_version_elb_backend url_bases = [ "https?://elasticloadbalancing.(.+).amazonaws.com", ] url_paths = { - '{0}/$': ELBV2Response.dispatch, + '{0}/$': api_version_elb_backend, } From 2249eee49df9d4c7edbbea96013c8bbddb06a072 Mon Sep 17 00:00:00 2001 From: Terry Cain Date: Wed, 4 Oct 2017 01:13:34 +0100 Subject: [PATCH 11/22] Potential fix V3 --- moto/elbv2/urls.py | 1 - 1 file changed, 1 deletion(-) diff --git a/moto/elbv2/urls.py b/moto/elbv2/urls.py index b7d8adb58..af51f7d3a 100644 --- a/moto/elbv2/urls.py +++ b/moto/elbv2/urls.py @@ -1,5 +1,4 @@ from __future__ import unicode_literals -from .responses import ELBV2Response from ..elb.urls import api_version_elb_backend url_bases = [ From 4a45acc216a6d4f66d194ea83f3cc7f3c7e93c26 Mon Sep 17 00:00:00 2001 From: Terry Cain Date: Wed, 4 Oct 2017 18:52:12 +0100 Subject: [PATCH 12/22] Implemented Update and Delete job queue --- moto/batch/models.py | 58 ++++++++++++++++++++++++ moto/batch/responses.py | 32 ++++++++++++++ moto/batch/urls.py | 2 + tests/test_batch/test_batch.py | 80 ++++++++++++++++++++++++++++++++++ 4 files changed, 172 insertions(+) diff --git a/moto/batch/models.py b/moto/batch/models.py index e336a60d7..6eb02c39c 100644 --- a/moto/batch/models.py +++ b/moto/batch/models.py @@ -454,6 +454,64 @@ class BatchBackend(BaseBackend): return result + def update_job_queue(self, queue_name, priority, state, compute_env_order): + """ + Update a job queue + + :param queue_name: Queue name + :type queue_name: str + :param priority: Queue priority + :type priority: int + :param state: Queue state + :type state: string + :param compute_env_order: Compute environment list + :type compute_env_order: list of dict + :return: Tuple of Name, ARN + :rtype: tuple of str + """ + if queue_name is None: + raise ClientException('jobQueueName must be provided') + + job_queue = self.get_job_queue(queue_name) + if job_queue is None: + raise ClientException('Job queue {0} does not exist'.format(queue_name)) + + if state is not None: + if state not in ('ENABLED', 'DISABLED'): + raise ClientException('state {0} must be one of ENABLED | DISABLED'.format(state)) + + job_queue.state = state + + if compute_env_order is not None: + if len(compute_env_order) == 0: + raise ClientException('At least 1 compute environment must be provided') + try: + # orders and extracts computeEnvironment names + ordered_compute_environments = [item['computeEnvironment'] for item in sorted(compute_env_order, key=lambda x: x['order'])] + env_objects = [] + # Check each ARN exists, then make a list of compute env's + for arn in ordered_compute_environments: + env = self.get_compute_environment_by_arn(arn) + if env is None: + raise ClientException('Compute environment {0} does not exist'.format(arn)) + env_objects.append(env) + except Exception: + raise ClientException('computeEnvironmentOrder is malformed') + + job_queue.env_order_json = compute_env_order + job_queue.environments = env_objects + + if priority is not None: + job_queue.priority = priority + + return queue_name, job_queue.arn + + def delete_job_queue(self, queue_name): + job_queue = self.get_job_queue(queue_name) + + if job_queue is not None: + del self._job_queues[job_queue.arn] + available_regions = boto3.session.Session().get_available_regions("batch") batch_backends = {region: BatchBackend(region_name=region) for region in available_regions} diff --git a/moto/batch/responses.py b/moto/batch/responses.py index 661b9c7c2..7c870382e 100644 --- a/moto/batch/responses.py +++ b/moto/batch/responses.py @@ -146,3 +146,35 @@ class BatchResponse(BaseResponse): result = {'jobQueues': queues} return json.dumps(result) + + # UpdateJobQueue + def updatejobqueue(self): + compute_env_order = self._get_param('computeEnvironmentOrder') + queue_name = self._get_param('jobQueue') + priority = self._get_param('priority') + state = self._get_param('state') + + try: + name, arn = self.batch_backend.update_job_queue( + queue_name=queue_name, + priority=priority, + state=state, + compute_env_order=compute_env_order + ) + except AWSError as err: + return err.response() + + result = { + 'jobQueueArn': arn, + 'jobQueueName': name + } + + return json.dumps(result) + + # DeleteJobQueue + def deletejobqueue(self): + queue_name = self._get_param('jobQueue') + + self.batch_backend.delete_job_queue(queue_name) + + return '' diff --git a/moto/batch/urls.py b/moto/batch/urls.py index 227e78ecf..bc186bd29 100644 --- a/moto/batch/urls.py +++ b/moto/batch/urls.py @@ -12,4 +12,6 @@ url_paths = { '{0}/v1/updatecomputeenvironment': BatchResponse.dispatch, '{0}/v1/createjobqueue': BatchResponse.dispatch, '{0}/v1/describejobqueues': BatchResponse.dispatch, + '{0}/v1/updatejobqueue': BatchResponse.dispatch, + '{0}/v1/deletejobqueue': BatchResponse.dispatch } diff --git a/tests/test_batch/test_batch.py b/tests/test_batch/test_batch.py index b082d656e..e7c4cf629 100644 --- a/tests/test_batch/test_batch.py +++ b/tests/test_batch/test_batch.py @@ -341,3 +341,83 @@ def test_job_queue_bad_arn(): ) except ClientError as err: err.response['Error']['Code'].should.equal('ClientException') + + +@mock_ec2 +@mock_ecs +@mock_iam +@mock_batch +def test_update_job_queue(): + ec2_client, iam_client, ecs_client, batch_client = _get_clients() + vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) + + compute_name = 'test_compute_env' + resp = batch_client.create_compute_environment( + computeEnvironmentName=compute_name, + type='UNMANAGED', + state='ENABLED', + serviceRole=iam_arn + ) + arn = resp['computeEnvironmentArn'] + + resp = batch_client.create_job_queue( + jobQueueName='test_job_queue', + state='ENABLED', + priority=123, + computeEnvironmentOrder=[ + { + 'order': 123, + 'computeEnvironment': arn + }, + ] + ) + queue_arn = resp['jobQueueArn'] + + batch_client.update_job_queue( + jobQueue=queue_arn, + priority=5 + ) + + resp = batch_client.describe_job_queues() + resp.should.contain('jobQueues') + len(resp['jobQueues']).should.equal(1) + resp['jobQueues'][0]['priority'].should.equal(5) + + +@mock_ec2 +@mock_ecs +@mock_iam +@mock_batch +def test_update_job_queue(): + ec2_client, iam_client, ecs_client, batch_client = _get_clients() + vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) + + compute_name = 'test_compute_env' + resp = batch_client.create_compute_environment( + computeEnvironmentName=compute_name, + type='UNMANAGED', + state='ENABLED', + serviceRole=iam_arn + ) + arn = resp['computeEnvironmentArn'] + + resp = batch_client.create_job_queue( + jobQueueName='test_job_queue', + state='ENABLED', + priority=123, + computeEnvironmentOrder=[ + { + 'order': 123, + 'computeEnvironment': arn + }, + ] + ) + queue_arn = resp['jobQueueArn'] + + batch_client.delete_job_queue( + jobQueue=queue_arn + ) + + resp = batch_client.describe_job_queues() + resp.should.contain('jobQueues') + len(resp['jobQueues']).should.equal(0) From 558f246115784cb59cc6a17b1fd67f6fac16894f Mon Sep 17 00:00:00 2001 From: Terry Cain Date: Wed, 4 Oct 2017 20:17:29 +0100 Subject: [PATCH 13/22] Added RegisterJobDefinition --- moto/batch/models.py | 90 +++++++++++++++++++++++++++++++++- moto/batch/responses.py | 27 ++++++++++ moto/batch/urls.py | 3 +- moto/batch/utils.py | 4 ++ tests/test_batch/test_batch.py | 26 ++++++++++ 5 files changed, 148 insertions(+), 2 deletions(-) diff --git a/moto/batch/models.py b/moto/batch/models.py index 6eb02c39c..2129320e7 100644 --- a/moto/batch/models.py +++ b/moto/batch/models.py @@ -10,7 +10,7 @@ from moto.ec2 import ec2_backends from moto.ecs import ecs_backends from .exceptions import InvalidParameterValueException, InternalFailure, ClientException -from .utils import make_arn_for_compute_env, make_arn_for_job_queue +from .utils import make_arn_for_compute_env, make_arn_for_job_queue, make_arn_for_task_def from moto.ec2.exceptions import InvalidSubnetIdError from moto.ec2.models import INSTANCE_TYPES as EC2_INSTANCE_TYPES from moto.iam.exceptions import IAMNotFoundException @@ -78,6 +78,52 @@ class JobQueue(BaseModel): return result +class JobDefinition(BaseModel): + def __init__(self, name, parameters, _type, container_properties, region_name, revision=0, retry_strategy=0): + self.name = name + self.retries = retry_strategy + self.type = _type + self.revision = revision + self._region = region_name + self.container_properties = container_properties + self.arn = None + + self.parameters = {} + if parameters is not None: + if not isinstance(parameters, dict): + raise ClientException('parameters must be a string to string map') + self.parameters = parameters + + if _type not in ('container',): + raise ClientException('type must be one of "container"') + + self._update_arn() + + # For future use when containers arnt the only thing in batch + if _type != 'container': + raise NotImplementedError() + + self._validate_container_properties() + + def _update_arn(self): + self.revision += 1 + self.arn = make_arn_for_task_def(DEFAULT_ACCOUNT_ID, self.name, self.revision, self._region) + + def _validate_container_properties(self): + if 'image' not in self.container_properties: + raise ClientException('containerProperties must contain image') + + if 'memory' not in self.container_properties: + raise ClientException('containerProperties must contain memory') + if self.container_properties['memory'] < 4: + raise ClientException('container memory limit must be greater than 4') + + if 'vcpus' not in self.container_properties: + raise ClientException('containerProperties must contain vcpus') + if self.container_properties['vcpus'] < 1: + raise ClientException('container vcpus limit must be greater than 0') + + class BatchBackend(BaseBackend): def __init__(self, region_name=None): super(BatchBackend, self).__init__() @@ -85,6 +131,7 @@ class BatchBackend(BaseBackend): self._compute_environments = {} self._job_queues = {} + self._job_definitions = {} @property def iam_backend(self): @@ -161,6 +208,29 @@ class BatchBackend(BaseBackend): env = self.get_job_queue_by_name(identifier) return env + def get_job_definition_by_arn(self, arn): + return self._job_definitions.get(arn) + + def get_job_definition_by_name(self, name): + for comp_env in self._job_definitions.values(): + if comp_env.name == name: + return comp_env + return None + + def get_job_definition(self, identifier): + """ + Get job queue by name or ARN + :param identifier: Name or ARN + :type identifier: str + + :return: Job Queue or None + :rtype: JobQueue or None + """ + env = self.get_job_definition_by_arn(identifier) + if env is None: + env = self.get_job_definition_by_name(identifier) + return env + def describe_compute_environments(self, environments=None, max_results=None, next_token=None): envs = set() if environments is not None: @@ -512,6 +582,24 @@ class BatchBackend(BaseBackend): if job_queue is not None: del self._job_queues[job_queue.arn] + def register_job_definition(self, def_name, parameters, _type, retry_strategy, container_properties): + if def_name is None: + raise ClientException('jobDefinitionName must be provided') + + if self.get_job_definition_by_name(def_name) is not None: + raise ClientException('A job definition called {0} already exists'.format(def_name)) + + if retry_strategy is not None: + try: + retry_strategy = retry_strategy['attempts'] + except Exception: + raise ClientException('retryStrategy is malformed') + + job_def = JobDefinition(def_name, parameters, _type, container_properties, region_name=self.region_name, retry_strategy=retry_strategy) + self._job_definitions[job_def.arn] = job_def + + return def_name, job_def.arn, job_def.revision + available_regions = boto3.session.Session().get_available_regions("batch") batch_backends = {region: BatchBackend(region_name=region) for region in available_regions} diff --git a/moto/batch/responses.py b/moto/batch/responses.py index 7c870382e..dec740221 100644 --- a/moto/batch/responses.py +++ b/moto/batch/responses.py @@ -178,3 +178,30 @@ class BatchResponse(BaseResponse): self.batch_backend.delete_job_queue(queue_name) return '' + + # RegisterJobDefinition + def registerjobdefinition(self): + container_properties = self._get_param('containerProperties') + def_name = self._get_param('jobDefinitionName') + parameters = self._get_param('parameters') + retry_strategy = self._get_param('retryStrategy') + _type = self._get_param('type') + + try: + name, arn, revision = self.batch_backend.register_job_definition( + def_name=def_name, + parameters=parameters, + _type=_type, + retry_strategy=retry_strategy, + container_properties=container_properties + ) + except AWSError as err: + return err.response() + + result = { + 'jobDefinitionArn': arn, + 'jobDefinitionName': name, + 'revision': revision + } + + return json.dumps(result) diff --git a/moto/batch/urls.py b/moto/batch/urls.py index bc186bd29..cd5ccb00c 100644 --- a/moto/batch/urls.py +++ b/moto/batch/urls.py @@ -13,5 +13,6 @@ url_paths = { '{0}/v1/createjobqueue': BatchResponse.dispatch, '{0}/v1/describejobqueues': BatchResponse.dispatch, '{0}/v1/updatejobqueue': BatchResponse.dispatch, - '{0}/v1/deletejobqueue': BatchResponse.dispatch + '{0}/v1/deletejobqueue': BatchResponse.dispatch, + '{0}/v1/registerjobdefinition': BatchResponse.dispatch } diff --git a/moto/batch/utils.py b/moto/batch/utils.py index 68c6a3581..6cdd381f7 100644 --- a/moto/batch/utils.py +++ b/moto/batch/utils.py @@ -7,3 +7,7 @@ def make_arn_for_compute_env(account_id, name, region_name): def make_arn_for_job_queue(account_id, name, region_name): return "arn:aws:batch:{0}:{1}:job-queue/{2}".format(region_name, account_id, name) + + +def make_arn_for_task_def(account_id, name, revision, region_name): + return "arn:aws:batch:{0}:{1}:job-definition/{2}:{3}".format(region_name, account_id, name, revision) diff --git a/tests/test_batch/test_batch.py b/tests/test_batch/test_batch.py index e7c4cf629..6eba45d27 100644 --- a/tests/test_batch/test_batch.py +++ b/tests/test_batch/test_batch.py @@ -421,3 +421,29 @@ def test_update_job_queue(): resp = batch_client.describe_job_queues() resp.should.contain('jobQueues') len(resp['jobQueues']).should.equal(0) + + +@mock_ec2 +@mock_ecs +@mock_iam +@mock_batch +def test_register_task_definition(): + ec2_client, iam_client, ecs_client, batch_client = _get_clients() + vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) + + resp = batch_client.register_job_definition( + jobDefinitionName='sleep10', + type='container', + containerProperties={ + 'image': 'busybox', + 'vcpus': 1, + 'memory': 128, + 'command': ['sleep', '10'] + } + ) + + resp.should.contain('jobDefinitionArn') + resp.should.contain('jobDefinitionName') + resp.should.contain('revision') + + assert resp['jobDefinitionArn'].endswith('{0}:{1}'.format(resp['jobDefinitionName'], resp['revision'])) From 0ca3fcc7a23c453a6c2c675af01b416f2962a91b Mon Sep 17 00:00:00 2001 From: Terry Cain Date: Thu, 5 Oct 2017 00:00:40 +0100 Subject: [PATCH 14/22] Added DescribeJobDefinitions --- moto/batch/models.py | 137 +++++++++++++++++++++++++++------ moto/batch/responses.py | 21 +++++ moto/batch/urls.py | 4 +- tests/test_batch/test_batch.py | 124 +++++++++++++++++++++++++++++ 4 files changed, 262 insertions(+), 24 deletions(-) diff --git a/moto/batch/models.py b/moto/batch/models.py index 2129320e7..bfbdcf4a5 100644 --- a/moto/batch/models.py +++ b/moto/batch/models.py @@ -87,29 +87,30 @@ class JobDefinition(BaseModel): self._region = region_name self.container_properties = container_properties self.arn = None + self.status = 'INACTIVE' - self.parameters = {} - if parameters is not None: - if not isinstance(parameters, dict): - raise ClientException('parameters must be a string to string map') - self.parameters = parameters - - if _type not in ('container',): - raise ClientException('type must be one of "container"') + if parameters is None: + parameters = {} + self.parameters = parameters + self._validate() self._update_arn() - # For future use when containers arnt the only thing in batch - if _type != 'container': - raise NotImplementedError() - - self._validate_container_properties() - def _update_arn(self): self.revision += 1 self.arn = make_arn_for_task_def(DEFAULT_ACCOUNT_ID, self.name, self.revision, self._region) - def _validate_container_properties(self): + def _validate(self): + if self.type not in ('container',): + raise ClientException('type must be one of "container"') + + # For future use when containers arnt the only thing in batch + if self.type != 'container': + raise NotImplementedError() + + if not isinstance(self.parameters, dict): + raise ClientException('parameters must be a string to string map') + if 'image' not in self.container_properties: raise ClientException('containerProperties must contain image') @@ -123,6 +124,37 @@ class JobDefinition(BaseModel): if self.container_properties['vcpus'] < 1: raise ClientException('container vcpus limit must be greater than 0') + def update(self, parameters, _type, container_properties, retry_strategy): + if parameters is None: + parameters = self.parameters + + if _type is None: + _type = self.type + + if container_properties is None: + container_properties = self.container_properties + + if retry_strategy is None: + retry_strategy = self.retries + + return JobDefinition(self.name, parameters, _type, container_properties, region_name=self._region, revision=self.revision, retry_strategy=retry_strategy) + + def describe(self): + result = { + 'jobDefinitionArn': self.arn, + 'jobDefinitionName': self.name, + 'parameters': self.parameters, + 'revision': self.revision, + 'status': self.status, + 'type': self.type + } + if self.container_properties is not None: + result['containerProperties'] = self.container_properties + if self.retries is not None and self.retries > 0: + result['retryStrategy'] = {'attempts': self.retries} + + return result + class BatchBackend(BaseBackend): def __init__(self, region_name=None): @@ -211,26 +243,52 @@ class BatchBackend(BaseBackend): def get_job_definition_by_arn(self, arn): return self._job_definitions.get(arn) - def get_job_definition_by_name(self, name): + def get_job_definition_by_name(self, name):# for comp_env in self._job_definitions.values(): if comp_env.name == name: return comp_env return None + def get_job_definition_by_name_revision(self, name, revision): + for job_def in self._job_definitions.values(): + if job_def.name == name and job_def.revision == revision: + return job_def + return None + def get_job_definition(self, identifier): """ - Get job queue by name or ARN + Get job defintiion by name or ARN :param identifier: Name or ARN :type identifier: str - :return: Job Queue or None - :rtype: JobQueue or None + :return: Job definition or None + :rtype: JobDefinition or None """ env = self.get_job_definition_by_arn(identifier) if env is None: env = self.get_job_definition_by_name(identifier) return env + def get_job_definitions(self, identifier): + """ + Get job defintiion by name or ARN + :param identifier: Name or ARN + :type identifier: str + + :return: Job definition or None + :rtype: list of JobDefinition + """ + result = [] + env = self.get_job_definition_by_arn(identifier) + if env is not None: + result.append(env) + else: + for value in self._job_definitions.values(): + if value.name == identifier: + result.append(value) + + return result + def describe_compute_environments(self, environments=None, max_results=None, next_token=None): envs = set() if environments is not None: @@ -586,20 +644,53 @@ class BatchBackend(BaseBackend): if def_name is None: raise ClientException('jobDefinitionName must be provided') - if self.get_job_definition_by_name(def_name) is not None: - raise ClientException('A job definition called {0} already exists'.format(def_name)) - + job_def = self.get_job_definition_by_name(def_name) if retry_strategy is not None: try: retry_strategy = retry_strategy['attempts'] except Exception: raise ClientException('retryStrategy is malformed') - job_def = JobDefinition(def_name, parameters, _type, container_properties, region_name=self.region_name, retry_strategy=retry_strategy) + if job_def is None: + job_def = JobDefinition(def_name, parameters, _type, container_properties, region_name=self.region_name, retry_strategy=retry_strategy) + else: + # Make new jobdef + job_def = job_def.update(parameters, _type, container_properties, retry_strategy) + self._job_definitions[job_def.arn] = job_def return def_name, job_def.arn, job_def.revision + def deregister_job_definition(self, def_name): + job_def = self.get_job_definition_by_arn(def_name) + if job_def is None and ':' in def_name: + name, revision = def_name.split(':', 1) + job_def = self.get_job_definition_by_name_revision(name, revision) + + if job_def is not None: + del self._job_definitions[job_def.arn] + + def describe_job_definitions(self, job_def_name=None, job_def_list=None, status=None, max_results=None, next_token=None): + jobs = [] + + # As a job name can reference multiple revisions, we get a list of them + if job_def_name is not None: + job_def = self.get_job_definitions(job_def_name) + if job_def is not None: + jobs.extend(job_def) + elif job_def_list is not None: + for job in job_def_list: + job_def = self.get_job_definitions(job) + if job_def is not None: + jobs.extend(job_def) + else: + jobs.extend(self._job_definitions.values()) + + # Got all the job defs were after, filter then by status + if status is not None: + return [job for job in jobs if job.status == status] + return jobs + available_regions = boto3.session.Session().get_available_regions("batch") batch_backends = {region: BatchBackend(region_name=region) for region in available_regions} diff --git a/moto/batch/responses.py b/moto/batch/responses.py index dec740221..0d3900d1d 100644 --- a/moto/batch/responses.py +++ b/moto/batch/responses.py @@ -205,3 +205,24 @@ class BatchResponse(BaseResponse): } return json.dumps(result) + + # DeregisterJobDefinition + def deregisterjobdefinition(self): + queue_name = self._get_param('jobDefinition') + + self.batch_backend.deregister_job_definition(queue_name) + + return '' + + # DescribeJobDefinitions + def describejobdefinitions(self): + job_def_name = self._get_param('jobDefinitionName') + job_def_list = self._get_param('jobDefinitions') + max_results = self._get_param('maxResults') + next_token = self._get_param('nextToken') + status = self._get_param('status') + + job_defs = self.batch_backend.describe_job_definitions(job_def_name, job_def_list, status, max_results, next_token) + + result = {'jobDefinitions': [job.describe() for job in job_defs]} + return json.dumps(result) diff --git a/moto/batch/urls.py b/moto/batch/urls.py index cd5ccb00c..3265bb535 100644 --- a/moto/batch/urls.py +++ b/moto/batch/urls.py @@ -14,5 +14,7 @@ url_paths = { '{0}/v1/describejobqueues': BatchResponse.dispatch, '{0}/v1/updatejobqueue': BatchResponse.dispatch, '{0}/v1/deletejobqueue': BatchResponse.dispatch, - '{0}/v1/registerjobdefinition': BatchResponse.dispatch + '{0}/v1/registerjobdefinition': BatchResponse.dispatch, + '{0}/v1/deregisterjobdefinition': BatchResponse.dispatch, + '{0}/v1/describejobdefinitions': BatchResponse.dispatch } diff --git a/tests/test_batch/test_batch.py b/tests/test_batch/test_batch.py index 6eba45d27..ebe710760 100644 --- a/tests/test_batch/test_batch.py +++ b/tests/test_batch/test_batch.py @@ -447,3 +447,127 @@ def test_register_task_definition(): resp.should.contain('revision') assert resp['jobDefinitionArn'].endswith('{0}:{1}'.format(resp['jobDefinitionName'], resp['revision'])) + + +@mock_ec2 +@mock_ecs +@mock_iam +@mock_batch +def test_reregister_task_definition(): + # Reregistering task with the same name bumps the revision number + ec2_client, iam_client, ecs_client, batch_client = _get_clients() + vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) + + resp1 = batch_client.register_job_definition( + jobDefinitionName='sleep10', + type='container', + containerProperties={ + 'image': 'busybox', + 'vcpus': 1, + 'memory': 128, + 'command': ['sleep', '10'] + } + ) + + resp1.should.contain('jobDefinitionArn') + resp1.should.contain('jobDefinitionName') + resp1.should.contain('revision') + + assert resp1['jobDefinitionArn'].endswith('{0}:{1}'.format(resp1['jobDefinitionName'], resp1['revision'])) + resp1['revision'].should.equal(1) + + resp2 = batch_client.register_job_definition( + jobDefinitionName='sleep10', + type='container', + containerProperties={ + 'image': 'busybox', + 'vcpus': 1, + 'memory': 68, + 'command': ['sleep', '10'] + } + ) + resp2['revision'].should.equal(2) + + resp2['jobDefinitionArn'].should_not.equal(resp1['jobDefinitionArn']) + + +@mock_ec2 +@mock_ecs +@mock_iam +@mock_batch +def test_delete_task_definition(): + ec2_client, iam_client, ecs_client, batch_client = _get_clients() + vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) + + resp = batch_client.register_job_definition( + jobDefinitionName='sleep10', + type='container', + containerProperties={ + 'image': 'busybox', + 'vcpus': 1, + 'memory': 128, + 'command': ['sleep', '10'] + } + ) + + batch_client.deregister_job_definition(jobDefinition=resp['jobDefinitionArn']) + + resp = batch_client.describe_job_definitions() + len(resp['jobDefinitions']).should.equal(0) + + +@mock_ec2 +@mock_ecs +@mock_iam +@mock_batch +def test_describe_task_definition(): + ec2_client, iam_client, ecs_client, batch_client = _get_clients() + vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) + + resp = batch_client.register_job_definition( + jobDefinitionName='sleep10', + type='container', + containerProperties={ + 'image': 'busybox', + 'vcpus': 1, + 'memory': 128, + 'command': ['sleep', '10'] + } + ) + arn1 = resp['jobDefinitionArn'] + resp = batch_client.register_job_definition( + jobDefinitionName='sleep10', + type='container', + containerProperties={ + 'image': 'busybox', + 'vcpus': 1, + 'memory': 64, + 'command': ['sleep', '10'] + } + ) + arn2 = resp['jobDefinitionArn'] + resp = batch_client.register_job_definition( + jobDefinitionName='test1', + type='container', + containerProperties={ + 'image': 'busybox', + 'vcpus': 1, + 'memory': 64, + 'command': ['sleep', '10'] + } + ) + arn3 = resp['jobDefinitionArn'] + + resp = batch_client.describe_job_definitions( + jobDefinitionName='sleep10' + ) + len(resp['jobDefinitions']).should.equal(2) + + resp = batch_client.describe_job_definitions() + len(resp['jobDefinitions']).should.equal(3) + + resp = batch_client.describe_job_definitions( + jobDefinitions=['sleep10', 'test1'] + ) + len(resp['jobDefinitions']).should.equal(3) + From 6eb755029cf77309c978f953e77ca033a9a8b3db Mon Sep 17 00:00:00 2001 From: Terry Cain Date: Thu, 5 Oct 2017 00:09:10 +0100 Subject: [PATCH 15/22] fix flake8 --- moto/batch/models.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/moto/batch/models.py b/moto/batch/models.py index bfbdcf4a5..05137296b 100644 --- a/moto/batch/models.py +++ b/moto/batch/models.py @@ -243,7 +243,7 @@ class BatchBackend(BaseBackend): def get_job_definition_by_arn(self, arn): return self._job_definitions.get(arn) - def get_job_definition_by_name(self, name):# + def get_job_definition_by_name(self, name): for comp_env in self._job_definitions.values(): if comp_env.name == name: return comp_env From e135344f0c339771f629ef9e1b1e364663b57fca Mon Sep 17 00:00:00 2001 From: Terry Cain Date: Fri, 6 Oct 2017 01:21:29 +0100 Subject: [PATCH 16/22] Added simple SubmitJob and DescribeJobs --- moto/batch/models.py | 225 +++++++++++++++++++++++++++++++++ moto/batch/responses.py | 37 ++++++ moto/batch/urls.py | 4 +- moto/logs/models.py | 24 +++- moto/logs/responses.py | 4 +- tests/test_batch/test_batch.py | 126 +++++++++++++++--- 6 files changed, 391 insertions(+), 29 deletions(-) diff --git a/moto/batch/models.py b/moto/batch/models.py index 05137296b..be8fca9d1 100644 --- a/moto/batch/models.py +++ b/moto/batch/models.py @@ -1,13 +1,22 @@ from __future__ import unicode_literals import boto3 import re +import requests.adapters from itertools import cycle import six +import datetime +import time import uuid +import logging +import docker +import functools +import threading +import dateutil.parser from moto.core import BaseBackend, BaseModel from moto.iam import iam_backends from moto.ec2 import ec2_backends from moto.ecs import ecs_backends +from moto.logs import logs_backends from .exceptions import InvalidParameterValueException, InternalFailure, ClientException from .utils import make_arn_for_compute_env, make_arn_for_job_queue, make_arn_for_task_def @@ -16,10 +25,16 @@ from moto.ec2.models import INSTANCE_TYPES as EC2_INSTANCE_TYPES from moto.iam.exceptions import IAMNotFoundException +_orig_adapter_send = requests.adapters.HTTPAdapter.send +logger = logging.getLogger(__name__) DEFAULT_ACCOUNT_ID = 123456789012 COMPUTE_ENVIRONMENT_NAME_REGEX = re.compile(r'^[A-Za-z0-9_]{1,128}$') +def datetime2int(date): + return int(time.mktime(date.timetuple())) + + class ComputeEnvironment(BaseModel): def __init__(self, compute_environment_name, _type, state, compute_resources, service_role, region_name): self.name = compute_environment_name @@ -65,6 +80,8 @@ class JobQueue(BaseModel): self.arn = make_arn_for_job_queue(DEFAULT_ACCOUNT_ID, name, region_name) self.status = 'VALID' + self.jobs = [] + def describe(self): result = { 'computeEnvironmentOrder': self.env_order_json, @@ -156,6 +173,162 @@ class JobDefinition(BaseModel): return result +class Job(threading.Thread, BaseModel): + def __init__(self, name, job_def, job_queue, log_backend): + """ + Docker Job + + :param name: Job Name + :param job_def: Job definition + :type: job_def: JobDefinition + :param job_queue: Job Queue + :param log_backend: Log backend + :type log_backend: moto.logs.models.LogsBackend + """ + threading.Thread.__init__(self) + + self.job_name = name + self.job_id = str(uuid.uuid4()) + self.job_definition = job_def + self.job_queue = job_queue + self.job_state = 'SUBMITTED' # One of SUBMITTED | PENDING | RUNNABLE | STARTING | RUNNING | SUCCEEDED | FAILED + self.job_queue.jobs.append(self) + self.job_started_at = datetime.datetime(1970, 1, 1) + self.job_stopped_at = datetime.datetime(1970, 1, 1) + self.job_stopped = False + + self.stop = False + + self.daemon = True + self.name = 'MOTO-BATCH-' + self.job_id + + self.docker_client = docker.from_env() + self._log_backend = log_backend + + # Unfortunately mocking replaces this method w/o fallback enabled, so we + # need to replace it if we detect it's been mocked + if requests.adapters.HTTPAdapter.send != _orig_adapter_send: + _orig_get_adapter = self.docker_client.api.get_adapter + + def replace_adapter_send(*args, **kwargs): + adapter = _orig_get_adapter(*args, **kwargs) + + if isinstance(adapter, requests.adapters.HTTPAdapter): + adapter.send = functools.partial(_orig_adapter_send, adapter) + return adapter + self.docker_client.api.get_adapter = replace_adapter_send + + def describe(self): + result = { + 'jobDefinition': self.job_definition.arn, + 'jobId': self.job_id, + 'jobName': self.job_name, + 'jobQueue': self.job_queue.arn, + 'startedAt': datetime2int(self.job_started_at), + 'status': self.job_state, + 'dependsOn': [] + } + if self.job_stopped: + result['stoppedAt'] = datetime2int(self.job_stopped_at) + return result + + def run(self): + """ + Run the container. + + Logic is as follows: + Generate container info (eventually from task definition) + Start container + Loop whilst not asked to stop and the container is running. + Get all logs from container between the last time I checked and now. + Convert logs into cloudwatch format + Put logs into cloudwatch + + :return: + """ + try: + self.job_state = 'PENDING' + time.sleep(1) + + image = 'alpine:latest' + cmd = '/bin/sh -c "for a in `seq 1 10`; do echo Hello World; sleep 1; done"' + name = '{0}-{1}'.format(self.job_name, self.job_id) + + self.job_state = 'RUNNABLE' + # TODO setup ecs container instance + time.sleep(1) + + self.job_state = 'STARTING' + container = self.docker_client.containers.run( + image, cmd, + detach=True, + name=name + ) + self.job_state = 'RUNNING' + self.job_started_at = datetime.datetime.now() + try: + # Log collection + logs_stdout = [] + logs_stderr = [] + container.reload() + + # Dodgy hack, we can only check docker logs once a second, but we want to loop more + # so we can stop if asked to in a quick manner, should all go away if we go async + # There also be some dodgyness when sending an integer to docker logs and some + # events seem to be duplicated. + now = datetime.datetime.now() + i = 1 + while container.status == 'running' and not self.stop: + time.sleep(0.15) + if i % 10 == 0: + logs_stderr.extend(container.logs(stdout=False, stderr=True, timestamps=True, since=datetime2int(now)).decode().split('\n')) + logs_stdout.extend(container.logs(stdout=True, stderr=False, timestamps=True, since=datetime2int(now)).decode().split('\n')) + now = datetime.datetime.now() + container.reload() + i += 1 + + # Container should be stopped by this point... unless asked to stop + if container.status == 'running': + container.kill() + + self.job_stopped_at = datetime.datetime.now() + # Get final logs + logs_stderr.extend(container.logs(stdout=False, stderr=True, timestamps=True, since=datetime2int(now)).decode().split('\n')) + logs_stdout.extend(container.logs(stdout=True, stderr=False, timestamps=True, since=datetime2int(now)).decode().split('\n')) + + self.job_state = 'SUCCEEDED' if not self.stop else 'FAILED' + + # Process logs + logs_stdout = [x for x in logs_stdout if len(x) > 0] + logs_stderr = [x for x in logs_stderr if len(x) > 0] + logs = [] + for line in logs_stdout + logs_stderr: + date, line = line.split(' ', 1) + date = dateutil.parser.parse(date) + date = int(date.timestamp()) + logs.append({'timestamp': date, 'message': line.strip()}) + + # Send to cloudwatch + log_group = '/aws/batch/job' + stream_name = '{0}/default/{1}'.format(self.job_definition.name, self.job_id) + self._log_backend.ensure_log_group(log_group, None) + self._log_backend.create_log_stream(log_group, stream_name) + self._log_backend.put_log_events(log_group, stream_name, logs, None) + + except Exception as err: + logger.error('Failed to run AWS Batch container {0}. Error {1}'.format(self.name, err)) + self.job_state = 'FAILED' + container.kill() + finally: + container.remove() + except Exception as err: + logger.error('Failed to run AWS Batch container {0}. Error {1}'.format(self.name, err)) + self.job_state = 'FAILED' + + self.job_stopped = True + self.job_stopped_at = datetime.datetime.now() + + class BatchBackend(BaseBackend): def __init__(self, region_name=None): super(BatchBackend, self).__init__() @@ -164,6 +337,7 @@ class BatchBackend(BaseBackend): self._compute_environments = {} self._job_queues = {} self._job_definitions = {} + self._jobs = {} @property def iam_backend(self): @@ -189,8 +363,23 @@ class BatchBackend(BaseBackend): """ return ecs_backends[self.region_name] + @property + def logs_backend(self): + """ + :return: ECS Backend + :rtype: moto.logs.models.LogsBackend + """ + return logs_backends[self.region_name] + def reset(self): region_name = self.region_name + + for job in self._jobs.values(): + if job.job_state not in ('FAILED', 'SUCCEEDED'): + job.stop = True + # Try to join + job.join(0.2) + self.__dict__ = {} self.__init__(region_name) @@ -691,6 +880,42 @@ class BatchBackend(BaseBackend): return [job for job in jobs if job.status == status] return jobs + def submit_job(self, job_name, job_def_id, job_queue, parameters=None, retries=None, depends_on=None, container_overrides=None): + # TODO parameters, retries (which is a dict raw from request), job dependancies and container overrides are ignored for now + + # Look for job definition + job_def = self.get_job_definition_by_arn(job_def_id) + if job_def is None and ':' in job_def_id: + job_def = self.get_job_definition_by_name_revision(*job_def_id.split(':', 1)) + if job_def is None: + raise ClientException('Job definition {0} does not exist'.format(job_def_id)) + + queue = self.get_job_queue(job_queue) + if queue is None: + raise ClientException('Job queue {0} does not exist'.format(job_queue)) + + job = Job(job_name, job_def, queue, log_backend=self.logs_backend) + self._jobs[job.job_id] = job + + # Here comes the fun + job.start() + + return job_name, job.job_id + + def describe_jobs(self, jobs): + job_filter = set() + if jobs is not None: + job_filter = set(jobs) + + result = [] + for key, job in self._jobs.items(): + if len(job_filter) > 0 and key not in job_filter: + continue + + result.append(job.describe()) + + return result + available_regions = boto3.session.Session().get_available_regions("batch") batch_backends = {region: BatchBackend(region_name=region) for region in available_regions} diff --git a/moto/batch/responses.py b/moto/batch/responses.py index 0d3900d1d..2bec7ddf1 100644 --- a/moto/batch/responses.py +++ b/moto/batch/responses.py @@ -226,3 +226,40 @@ class BatchResponse(BaseResponse): result = {'jobDefinitions': [job.describe() for job in job_defs]} return json.dumps(result) + + # SubmitJob + def submitjob(self): + container_overrides = self._get_param('containerOverrides') + depends_on = self._get_param('dependsOn') + job_def = self._get_param('jobDefinition') + job_name = self._get_param('jobName') + job_queue = self._get_param('jobQueue') + parameters = self._get_param('parameters') + retries = self._get_param('retryStrategy') + + try: + name, job_id = self.batch_backend.submit_job( + job_name, job_def, job_queue, + parameters=parameters, + retries=retries, + depends_on=depends_on, + container_overrides=container_overrides + ) + except AWSError as err: + return err.response() + + result = { + 'jobId': job_id, + 'jobName': name, + } + + return json.dumps(result) + + # DescribeJobs + def describejobs(self): + jobs = self._get_param('jobs') + + try: + return json.dumps({'jobs': self.batch_backend.describe_jobs(jobs)}) + except AWSError as err: + return err.response() diff --git a/moto/batch/urls.py b/moto/batch/urls.py index 3265bb535..924e55e6d 100644 --- a/moto/batch/urls.py +++ b/moto/batch/urls.py @@ -16,5 +16,7 @@ url_paths = { '{0}/v1/deletejobqueue': BatchResponse.dispatch, '{0}/v1/registerjobdefinition': BatchResponse.dispatch, '{0}/v1/deregisterjobdefinition': BatchResponse.dispatch, - '{0}/v1/describejobdefinitions': BatchResponse.dispatch + '{0}/v1/describejobdefinitions': BatchResponse.dispatch, + '{0}/v1/submitjob': BatchResponse.dispatch, + '{0}/v1/describejobs': BatchResponse.dispatch } diff --git a/moto/logs/models.py b/moto/logs/models.py index 14f511932..09dcb3645 100644 --- a/moto/logs/models.py +++ b/moto/logs/models.py @@ -22,6 +22,13 @@ class LogEvent: "timestamp": self.timestamp } + def to_response_dict(self): + return { + "ingestionTime": self.ingestionTime, + "message": self.message, + "timestamp": self.timestamp + } + class LogStream: _log_ids = 0 @@ -41,7 +48,14 @@ class LogStream: self.__class__._log_ids += 1 + def _update(self): + self.firstEventTimestamp = min([x.timestamp for x in self.events]) + self.lastEventTimestamp = max([x.timestamp for x in self.events]) + def to_describe_dict(self): + # Compute start and end times + self._update() + return { "arn": self.arn, "creationTime": self.creationTime, @@ -79,7 +93,7 @@ class LogStream: if next_token is None: next_token = 0 - events_page = events[next_token: next_token + limit] + events_page = [event.to_response_dict() for event in events[next_token: next_token + limit]] next_token += limit if next_token >= len(self.events): next_token = None @@ -120,17 +134,17 @@ class LogGroup: del self.streams[log_stream_name] def describe_log_streams(self, descending, limit, log_group_name, log_stream_name_prefix, next_token, order_by): - log_streams = [stream.to_describe_dict() for name, stream in self.streams.items() if name.startswith(log_stream_name_prefix)] + log_streams = [(name, stream.to_describe_dict()) for name, stream in self.streams.items() if name.startswith(log_stream_name_prefix)] - def sorter(stream): - return stream.name if order_by == 'logStreamName' else stream.lastEventTimestamp + def sorter(item): + return item[0] if order_by == 'logStreamName' else item[1]['lastEventTimestamp'] if next_token is None: next_token = 0 log_streams = sorted(log_streams, key=sorter, reverse=descending) new_token = next_token + limit - log_streams_page = log_streams[next_token: new_token] + log_streams_page = [x[1] for x in log_streams[next_token: new_token]] if new_token >= len(log_streams): new_token = None diff --git a/moto/logs/responses.py b/moto/logs/responses.py index 4cb9caa6a..53b2390f4 100644 --- a/moto/logs/responses.py +++ b/moto/logs/responses.py @@ -47,7 +47,7 @@ class LogsResponse(BaseResponse): def describe_log_streams(self): log_group_name = self._get_param('logGroupName') - log_stream_name_prefix = self._get_param('logStreamNamePrefix') + log_stream_name_prefix = self._get_param('logStreamNamePrefix', '') descending = self._get_param('descending', False) limit = self._get_param('limit', 50) assert limit <= 50 @@ -83,7 +83,7 @@ class LogsResponse(BaseResponse): limit = self._get_param('limit', 10000) assert limit <= 10000 next_token = self._get_param('nextToken') - start_from_head = self._get_param('startFromHead') + start_from_head = self._get_param('startFromHead', False) events, next_backward_token, next_foward_token = \ self.logs_backend.get_log_events(log_group_name, log_stream_name, start_time, end_time, limit, next_token, start_from_head) diff --git a/tests/test_batch/test_batch.py b/tests/test_batch/test_batch.py index ebe710760..acbe75e94 100644 --- a/tests/test_batch/test_batch.py +++ b/tests/test_batch/test_batch.py @@ -1,11 +1,24 @@ from __future__ import unicode_literals +import time +import datetime import boto3 from botocore.exceptions import ClientError import sure # noqa -from moto import mock_batch, mock_iam, mock_ec2, mock_ecs +from moto import mock_batch, mock_iam, mock_ec2, mock_ecs, mock_logs +import functools +import nose +def expected_failure(test): + @functools.wraps(test) + def inner(*args, **kwargs): + try: + test(*args, **kwargs) + except Exception as err: + raise nose.SkipTest + return inner + DEFAULT_REGION = 'eu-central-1' @@ -13,6 +26,7 @@ def _get_clients(): return boto3.client('ec2', region_name=DEFAULT_REGION), \ boto3.client('iam', region_name=DEFAULT_REGION), \ boto3.client('ecs', region_name=DEFAULT_REGION), \ + boto3.client('logs', region_name=DEFAULT_REGION), \ boto3.client('batch', region_name=DEFAULT_REGION) @@ -52,7 +66,7 @@ def _setup(ec2_client, iam_client): @mock_iam @mock_batch def test_create_managed_compute_environment(): - ec2_client, iam_client, ecs_client, batch_client = _get_clients() + ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients() vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) compute_name = 'test_compute_env' @@ -105,7 +119,7 @@ def test_create_managed_compute_environment(): @mock_iam @mock_batch def test_create_unmanaged_compute_environment(): - ec2_client, iam_client, ecs_client, batch_client = _get_clients() + ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients() vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) compute_name = 'test_compute_env' @@ -136,7 +150,7 @@ def test_create_unmanaged_compute_environment(): @mock_iam @mock_batch def test_describe_compute_environment(): - ec2_client, iam_client, ecs_client, batch_client = _get_clients() + ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients() vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) compute_name = 'test_compute_env' @@ -163,7 +177,7 @@ def test_describe_compute_environment(): @mock_iam @mock_batch def test_delete_unmanaged_compute_environment(): - ec2_client, iam_client, ecs_client, batch_client = _get_clients() + ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients() vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) compute_name = 'test_compute_env' @@ -190,7 +204,7 @@ def test_delete_unmanaged_compute_environment(): @mock_iam @mock_batch def test_delete_managed_compute_environment(): - ec2_client, iam_client, ecs_client, batch_client = _get_clients() + ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients() vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) compute_name = 'test_compute_env' @@ -247,7 +261,7 @@ def test_delete_managed_compute_environment(): @mock_iam @mock_batch def test_update_unmanaged_compute_environment_state(): - ec2_client, iam_client, ecs_client, batch_client = _get_clients() + ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients() vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) compute_name = 'test_compute_env' @@ -273,7 +287,7 @@ def test_update_unmanaged_compute_environment_state(): @mock_iam @mock_batch def test_create_job_queue(): - ec2_client, iam_client, ecs_client, batch_client = _get_clients() + ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients() vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) compute_name = 'test_compute_env' @@ -315,7 +329,7 @@ def test_create_job_queue(): @mock_iam @mock_batch def test_job_queue_bad_arn(): - ec2_client, iam_client, ecs_client, batch_client = _get_clients() + ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients() vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) compute_name = 'test_compute_env' @@ -348,7 +362,7 @@ def test_job_queue_bad_arn(): @mock_iam @mock_batch def test_update_job_queue(): - ec2_client, iam_client, ecs_client, batch_client = _get_clients() + ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients() vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) compute_name = 'test_compute_env' @@ -389,7 +403,7 @@ def test_update_job_queue(): @mock_iam @mock_batch def test_update_job_queue(): - ec2_client, iam_client, ecs_client, batch_client = _get_clients() + ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients() vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) compute_name = 'test_compute_env' @@ -428,7 +442,7 @@ def test_update_job_queue(): @mock_iam @mock_batch def test_register_task_definition(): - ec2_client, iam_client, ecs_client, batch_client = _get_clients() + ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients() vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) resp = batch_client.register_job_definition( @@ -455,7 +469,7 @@ def test_register_task_definition(): @mock_batch def test_reregister_task_definition(): # Reregistering task with the same name bumps the revision number - ec2_client, iam_client, ecs_client, batch_client = _get_clients() + ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients() vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) resp1 = batch_client.register_job_definition( @@ -496,7 +510,7 @@ def test_reregister_task_definition(): @mock_iam @mock_batch def test_delete_task_definition(): - ec2_client, iam_client, ecs_client, batch_client = _get_clients() + ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients() vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) resp = batch_client.register_job_definition( @@ -521,10 +535,10 @@ def test_delete_task_definition(): @mock_iam @mock_batch def test_describe_task_definition(): - ec2_client, iam_client, ecs_client, batch_client = _get_clients() + ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients() vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) - resp = batch_client.register_job_definition( + batch_client.register_job_definition( jobDefinitionName='sleep10', type='container', containerProperties={ @@ -534,8 +548,7 @@ def test_describe_task_definition(): 'command': ['sleep', '10'] } ) - arn1 = resp['jobDefinitionArn'] - resp = batch_client.register_job_definition( + batch_client.register_job_definition( jobDefinitionName='sleep10', type='container', containerProperties={ @@ -545,8 +558,7 @@ def test_describe_task_definition(): 'command': ['sleep', '10'] } ) - arn2 = resp['jobDefinitionArn'] - resp = batch_client.register_job_definition( + batch_client.register_job_definition( jobDefinitionName='test1', type='container', containerProperties={ @@ -556,7 +568,6 @@ def test_describe_task_definition(): 'command': ['sleep', '10'] } ) - arn3 = resp['jobDefinitionArn'] resp = batch_client.describe_job_definitions( jobDefinitionName='sleep10' @@ -571,3 +582,76 @@ def test_describe_task_definition(): ) len(resp['jobDefinitions']).should.equal(3) + +# SLOW TEST +@expected_failure +@mock_logs +@mock_ec2 +@mock_ecs +@mock_iam +@mock_batch +def test_submit_job(): + ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients() + vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) + + compute_name = 'test_compute_env' + resp = batch_client.create_compute_environment( + computeEnvironmentName=compute_name, + type='UNMANAGED', + state='ENABLED', + serviceRole=iam_arn + ) + arn = resp['computeEnvironmentArn'] + + resp = batch_client.create_job_queue( + jobQueueName='test_job_queue', + state='ENABLED', + priority=123, + computeEnvironmentOrder=[ + { + 'order': 123, + 'computeEnvironment': arn + }, + ] + ) + queue_arn = resp['jobQueueArn'] + + resp = batch_client.register_job_definition( + jobDefinitionName='sleep10', + type='container', + containerProperties={ + 'image': 'busybox', + 'vcpus': 1, + 'memory': 128, + 'command': ['sleep', '10'] + } + ) + job_def_arn = resp['jobDefinitionArn'] + + resp = batch_client.submit_job( + jobName='test1', + jobQueue=queue_arn, + jobDefinition=job_def_arn + ) + job_id = resp['jobId'] + + future = datetime.datetime.now() + datetime.timedelta(seconds=30) + + while datetime.datetime.now() < future: + resp = batch_client.describe_jobs(jobs=[job_id]) + print("{0}:{1} {2}".format(resp['jobs'][0]['jobName'], resp['jobs'][0]['jobId'], resp['jobs'][0]['status'])) + + if resp['jobs'][0]['status'] == 'FAILED': + raise RuntimeError('Batch job failed') + if resp['jobs'][0]['status'] == 'SUCCEEDED': + break + time.sleep(0.5) + else: + raise RuntimeError('Batch job timed out') + + resp = logs_client.describe_log_streams(logGroupName='/aws/batch/job') + len(resp['logStreams']).should.equal(1) + ls_name = resp['logStreams'][0]['logStreamName'] + + resp = logs_client.get_log_events(logGroupName='/aws/batch/job', logStreamName=ls_name) + len(resp['events']).should.be.greater_than(5) \ No newline at end of file From e3024ae1bad51c13cc18849d1d502670410f2940 Mon Sep 17 00:00:00 2001 From: Terry Cain Date: Wed, 11 Oct 2017 23:46:27 +0100 Subject: [PATCH 17/22] Implemented Terminate, Cancel and List jobs --- moto/batch/models.py | 52 +++++++++++ moto/batch/responses.py | 39 +++++++++ moto/batch/urls.py | 5 +- tests/test_batch/test_batch.py | 156 ++++++++++++++++++++++++++++++++- 4 files changed, 249 insertions(+), 3 deletions(-) diff --git a/moto/batch/models.py b/moto/batch/models.py index be8fca9d1..7f75225f7 100644 --- a/moto/batch/models.py +++ b/moto/batch/models.py @@ -196,6 +196,7 @@ class Job(threading.Thread, BaseModel): self.job_started_at = datetime.datetime(1970, 1, 1) self.job_stopped_at = datetime.datetime(1970, 1, 1) self.job_stopped = False + self.job_stopped_reason = None self.stop = False @@ -230,6 +231,8 @@ class Job(threading.Thread, BaseModel): } if self.job_stopped: result['stoppedAt'] = datetime2int(self.job_stopped_at) + if self.job_stopped_reason is not None: + result['statusReason'] = self.job_stopped_reason return result def run(self): @@ -328,6 +331,11 @@ class Job(threading.Thread, BaseModel): self.job_stopped = True self.job_stopped_at = datetime.datetime.now() + def terminate(self, reason): + if not self.stop: + self.stop = True + self.job_stopped_reason = reason + class BatchBackend(BaseBackend): def __init__(self, region_name=None): @@ -478,6 +486,20 @@ class BatchBackend(BaseBackend): return result + def get_job_by_id(self, identifier): + """ + Get job by id + :param identifier: Job ID + :type identifier: str + + :return: Job + :rtype: Job + """ + try: + return self._jobs[identifier] + except KeyError: + return None + def describe_compute_environments(self, environments=None, max_results=None, next_token=None): envs = set() if environments is not None: @@ -916,6 +938,36 @@ class BatchBackend(BaseBackend): return result + def list_jobs(self, job_queue, job_status=None, max_results=None, next_token=None): + jobs = [] + + job_queue = self.get_job_queue(job_queue) + if job_queue is None: + raise ClientException('Job queue {0} does not exist'.format(job_queue)) + + if job_status is not None and job_status not in ('SUBMITTED', 'PENDING', 'RUNNABLE', 'STARTING', 'RUNNING', 'SUCCEEDED', 'FAILED'): + raise ClientException('Job status is not one of SUBMITTED | PENDING | RUNNABLE | STARTING | RUNNING | SUCCEEDED | FAILED') + + for job in job_queue.jobs: + if job_status is not None and job.job_state != job_status: + continue + + jobs.append(job) + + return jobs + + def terminate_job(self, job_id, reason): + if job_id is None: + raise ClientException('Job ID does not exist') + if reason is None: + raise ClientException('Reason does not exist') + + job = self.get_job_by_id(job_id) + if job is None: + raise ClientException('Job not found') + + job.terminate(reason) + available_regions = boto3.session.Session().get_available_regions("batch") batch_backends = {region: BatchBackend(region_name=region) for region in available_regions} diff --git a/moto/batch/responses.py b/moto/batch/responses.py index 2bec7ddf1..96094068d 100644 --- a/moto/batch/responses.py +++ b/moto/batch/responses.py @@ -263,3 +263,42 @@ class BatchResponse(BaseResponse): return json.dumps({'jobs': self.batch_backend.describe_jobs(jobs)}) except AWSError as err: return err.response() + + # ListJobs + def listjobs(self): + job_queue = self._get_param('jobQueue') + job_status = self._get_param('jobStatus') + max_results = self._get_param('maxResults') + next_token = self._get_param('nextToken') + + try: + jobs = self.batch_backend.list_jobs(job_queue, job_status, max_results, next_token) + except AWSError as err: + return err.response() + + result = {'jobSummaryList': [{'jobId': job.job_id, 'jobName': job.job_name} for job in jobs]} + return json.dumps(result) + + # TerminateJob + def terminatejob(self): + job_id = self._get_param('jobId') + reason = self._get_param('reason') + + try: + self.batch_backend.terminate_job(job_id, reason) + except AWSError as err: + return err.response() + + return '' + + # CancelJob + def canceljob(self): # Theres some AWS semantics on the differences but for us they're identical ;-) + job_id = self._get_param('jobId') + reason = self._get_param('reason') + + try: + self.batch_backend.terminate_job(job_id, reason) + except AWSError as err: + return err.response() + + return '' diff --git a/moto/batch/urls.py b/moto/batch/urls.py index 924e55e6d..c64086ef2 100644 --- a/moto/batch/urls.py +++ b/moto/batch/urls.py @@ -18,5 +18,8 @@ url_paths = { '{0}/v1/deregisterjobdefinition': BatchResponse.dispatch, '{0}/v1/describejobdefinitions': BatchResponse.dispatch, '{0}/v1/submitjob': BatchResponse.dispatch, - '{0}/v1/describejobs': BatchResponse.dispatch + '{0}/v1/describejobs': BatchResponse.dispatch, + '{0}/v1/listjobs': BatchResponse.dispatch, + '{0}/v1/terminatejob': BatchResponse.dispatch, + '{0}/v1/canceljob': BatchResponse.dispatch, } diff --git a/tests/test_batch/test_batch.py b/tests/test_batch/test_batch.py index acbe75e94..ec24cd911 100644 --- a/tests/test_batch/test_batch.py +++ b/tests/test_batch/test_batch.py @@ -583,7 +583,7 @@ def test_describe_task_definition(): len(resp['jobDefinitions']).should.equal(3) -# SLOW TEST +# SLOW TESTS @expected_failure @mock_logs @mock_ec2 @@ -654,4 +654,156 @@ def test_submit_job(): ls_name = resp['logStreams'][0]['logStreamName'] resp = logs_client.get_log_events(logGroupName='/aws/batch/job', logStreamName=ls_name) - len(resp['events']).should.be.greater_than(5) \ No newline at end of file + len(resp['events']).should.be.greater_than(5) + + +@expected_failure +@mock_logs +@mock_ec2 +@mock_ecs +@mock_iam +@mock_batch +def test_list_jobs(): + ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients() + vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) + + compute_name = 'test_compute_env' + resp = batch_client.create_compute_environment( + computeEnvironmentName=compute_name, + type='UNMANAGED', + state='ENABLED', + serviceRole=iam_arn + ) + arn = resp['computeEnvironmentArn'] + + resp = batch_client.create_job_queue( + jobQueueName='test_job_queue', + state='ENABLED', + priority=123, + computeEnvironmentOrder=[ + { + 'order': 123, + 'computeEnvironment': arn + }, + ] + ) + queue_arn = resp['jobQueueArn'] + + resp = batch_client.register_job_definition( + jobDefinitionName='sleep10', + type='container', + containerProperties={ + 'image': 'busybox', + 'vcpus': 1, + 'memory': 128, + 'command': ['sleep', '10'] + } + ) + job_def_arn = resp['jobDefinitionArn'] + + resp = batch_client.submit_job( + jobName='test1', + jobQueue=queue_arn, + jobDefinition=job_def_arn + ) + job_id1 = resp['jobId'] + resp = batch_client.submit_job( + jobName='test2', + jobQueue=queue_arn, + jobDefinition=job_def_arn + ) + job_id2 = resp['jobId'] + + future = datetime.datetime.now() + datetime.timedelta(seconds=30) + + resp_finished_jobs = batch_client.list_jobs( + jobQueue=queue_arn, + jobStatus='SUCCEEDED' + ) + + # Wait only as long as it takes to run the jobs + while datetime.datetime.now() < future: + resp = batch_client.describe_jobs(jobs=[job_id1, job_id2]) + + any_failed_jobs = any([job['status'] == 'FAILED' for job in resp['jobs']]) + succeeded_jobs = all([job['status'] == 'SUCCEEDED' for job in resp['jobs']]) + + if any_failed_jobs: + raise RuntimeError('A Batch job failed') + if succeeded_jobs: + break + time.sleep(0.5) + else: + raise RuntimeError('Batch jobs timed out') + + resp_finished_jobs2 = batch_client.list_jobs( + jobQueue=queue_arn, + jobStatus='SUCCEEDED' + ) + + len(resp_finished_jobs['jobSummaryList']).should.equal(0) + len(resp_finished_jobs2['jobSummaryList']).should.equal(2) + + +@expected_failure +@mock_logs +@mock_ec2 +@mock_ecs +@mock_iam +@mock_batch +def test_terminate_job(): + ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients() + vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) + + compute_name = 'test_compute_env' + resp = batch_client.create_compute_environment( + computeEnvironmentName=compute_name, + type='UNMANAGED', + state='ENABLED', + serviceRole=iam_arn + ) + arn = resp['computeEnvironmentArn'] + + resp = batch_client.create_job_queue( + jobQueueName='test_job_queue', + state='ENABLED', + priority=123, + computeEnvironmentOrder=[ + { + 'order': 123, + 'computeEnvironment': arn + }, + ] + ) + queue_arn = resp['jobQueueArn'] + + resp = batch_client.register_job_definition( + jobDefinitionName='sleep10', + type='container', + containerProperties={ + 'image': 'busybox', + 'vcpus': 1, + 'memory': 128, + 'command': ['sleep', '10'] + } + ) + job_def_arn = resp['jobDefinitionArn'] + + resp = batch_client.submit_job( + jobName='test1', + jobQueue=queue_arn, + jobDefinition=job_def_arn + ) + job_id = resp['jobId'] + + time.sleep(2) + + batch_client.terminate_job(jobId=job_id, reason='test_terminate') + + time.sleep(1) + + resp = batch_client.describe_jobs(jobs=[job_id]) + resp['jobs'][0]['jobName'].should.equal('test1') + resp['jobs'][0]['status'].should.equal('FAILED') + resp['jobs'][0]['statusReason'].should.equal('test_terminate') + From 453da4c8b349d9777bac7fbc4667b0a11188806e Mon Sep 17 00:00:00 2001 From: Terry Cain Date: Fri, 20 Oct 2017 00:51:04 +0100 Subject: [PATCH 18/22] Added CreateEnvironment to cloudformation --- moto/batch/models.py | 46 ++++++++++-- moto/cloudformation/parsing.py | 4 + tests/test_batch/test_cloudformation.py | 98 +++++++++++++++++++++++++ 3 files changed, 141 insertions(+), 7 deletions(-) create mode 100644 tests/test_batch/test_cloudformation.py diff --git a/moto/batch/models.py b/moto/batch/models.py index 7f75225f7..0fe3016ca 100644 --- a/moto/batch/models.py +++ b/moto/batch/models.py @@ -28,7 +28,7 @@ from moto.iam.exceptions import IAMNotFoundException _orig_adapter_send = requests.adapters.HTTPAdapter.send logger = logging.getLogger(__name__) DEFAULT_ACCOUNT_ID = 123456789012 -COMPUTE_ENVIRONMENT_NAME_REGEX = re.compile(r'^[A-Za-z0-9_]{1,128}$') +COMPUTE_ENVIRONMENT_NAME_REGEX = re.compile(r'^[A-Za-z0-9][A-Za-z0-9_-]{1,126}[A-Za-z0-9]$') def datetime2int(date): @@ -38,7 +38,7 @@ def datetime2int(date): class ComputeEnvironment(BaseModel): def __init__(self, compute_environment_name, _type, state, compute_resources, service_role, region_name): self.name = compute_environment_name - self.type = _type + self.env_type = _type self.state = state self.compute_resources = compute_resources self.service_role = service_role @@ -55,6 +55,33 @@ class ComputeEnvironment(BaseModel): self.ecs_arn = arn self.ecs_name = name + @property + def physical_resource_id(self): + return self.arn + + @classmethod + def create_from_cloudformation_json(cls, resource_name, cloudformation_json, region_name): + backend = batch_backends[region_name] + properties = cloudformation_json['Properties'] + + # Need to deal with difference case from cloudformation compute_resources, e.g. instanceRole vs InstanceRole + # Hacky fix to normalise keys + new_comp_res = {} + for key, value in properties['ComputeResources'].items(): + new_key = key[0].lower() + key[1:] + new_comp_res[new_key] = value + + env = backend.create_compute_environment( + resource_name, + properties['Type'], + properties.get('State', 'ENABLED'), + new_comp_res, + properties['ServiceRole'] + ) + arn = env[1] + + return backend.get_compute_environment_by_arn(arn) + class JobQueue(BaseModel): def __init__(self, name, priority, state, environments, env_order_json, region_name): @@ -517,10 +544,10 @@ class BatchBackend(BaseBackend): 'ecsClusterArn': environment.ecs_arn, 'serviceRole': environment.service_role, 'state': environment.state, - 'type': environment.type, + 'type': environment.env_type, 'status': 'VALID' } - if environment.type == 'MANAGED': + if environment.env_type == 'MANAGED': json_part['computeResources'] = environment.compute_resources result.append(json_part) @@ -530,7 +557,7 @@ class BatchBackend(BaseBackend): def create_compute_environment(self, compute_environment_name, _type, state, compute_resources, service_role): # Validate if COMPUTE_ENVIRONMENT_NAME_REGEX.match(compute_environment_name) is None: - raise InvalidParameterValueException('Compute environment name does not match ^[A-Za-z0-9_]{1,128}$') + raise InvalidParameterValueException('Compute environment name does not match ^[A-Za-z0-9][A-Za-z0-9_-]{1,126}[A-Za-z0-9]$') if self.get_compute_environment_by_name(compute_environment_name) is not None: raise InvalidParameterValueException('A compute environment already exists with the name {0}'.format(compute_environment_name)) @@ -617,7 +644,9 @@ class BatchBackend(BaseBackend): if len(cr['instanceTypes']) == 0: raise InvalidParameterValueException('At least 1 instance type must be provided') for instance_type in cr['instanceTypes']: - if instance_type not in EC2_INSTANCE_TYPES: + if instance_type == 'optimal': + pass # Optimal should pick from latest of current gen + elif instance_type not in EC2_INSTANCE_TYPES: raise InvalidParameterValueException('Instance type {0} does not exist'.format(instance_type)) for sec_id in cr['securityGroupIds']: @@ -657,6 +686,9 @@ class BatchBackend(BaseBackend): instances = [] for instance_type in instance_types: + if instance_type == 'optimal': + instance_type = 'm4.4xlarge' + instance_vcpus.append( (EC2_INSTANCE_TYPES[instance_type]['vcpus'], instance_type) ) @@ -700,7 +732,7 @@ class BatchBackend(BaseBackend): # Delete ECS cluster self.ecs_backend.delete_cluster(compute_env.ecs_name) - if compute_env.type == 'MANAGED': + if compute_env.env_type == 'MANAGED': # Delete compute envrionment instance_ids = [instance.id for instance in compute_env.instances] self.ec2_backend.terminate_instances(instance_ids) diff --git a/moto/cloudformation/parsing.py b/moto/cloudformation/parsing.py index 923ada058..05a408be1 100644 --- a/moto/cloudformation/parsing.py +++ b/moto/cloudformation/parsing.py @@ -8,6 +8,7 @@ import re from moto.autoscaling import models as autoscaling_models from moto.awslambda import models as lambda_models +from moto.batch import models as batch_models from moto.cloudwatch import models as cloudwatch_models from moto.datapipeline import models as datapipeline_models from moto.dynamodb import models as dynamodb_models @@ -31,6 +32,9 @@ from boto.cloudformation.stack import Output MODEL_MAP = { "AWS::AutoScaling::AutoScalingGroup": autoscaling_models.FakeAutoScalingGroup, "AWS::AutoScaling::LaunchConfiguration": autoscaling_models.FakeLaunchConfiguration, + "AWS::Batch::JobDefinition": batch_models.JobDefinition, + "AWS::Batch::JobQueue": batch_models.JobQueue, + "AWS::Batch::ComputeEnvironment": batch_models.ComputeEnvironment, "AWS::DynamoDB::Table": dynamodb_models.Table, "AWS::Kinesis::Stream": kinesis_models.Stream, "AWS::Lambda::EventSourceMapping": lambda_models.EventSourceMapping, diff --git a/tests/test_batch/test_cloudformation.py b/tests/test_batch/test_cloudformation.py new file mode 100644 index 000000000..b0203af93 --- /dev/null +++ b/tests/test_batch/test_cloudformation.py @@ -0,0 +1,98 @@ +from __future__ import unicode_literals + +import time +import datetime +import boto3 +from botocore.exceptions import ClientError +import sure # noqa +from moto import mock_batch, mock_iam, mock_ec2, mock_ecs, mock_logs, mock_cloudformation +import functools +import nose +import json + +DEFAULT_REGION = 'eu-central-1' + + +def _get_clients(): + return boto3.client('ec2', region_name=DEFAULT_REGION), \ + boto3.client('iam', region_name=DEFAULT_REGION), \ + boto3.client('ecs', region_name=DEFAULT_REGION), \ + boto3.client('logs', region_name=DEFAULT_REGION), \ + boto3.client('batch', region_name=DEFAULT_REGION) + + +def _setup(ec2_client, iam_client): + """ + Do prerequisite setup + :return: VPC ID, Subnet ID, Security group ID, IAM Role ARN + :rtype: tuple + """ + resp = ec2_client.create_vpc(CidrBlock='172.30.0.0/24') + vpc_id = resp['Vpc']['VpcId'] + resp = ec2_client.create_subnet( + AvailabilityZone='eu-central-1a', + CidrBlock='172.30.0.0/25', + VpcId=vpc_id + ) + subnet_id = resp['Subnet']['SubnetId'] + resp = ec2_client.create_security_group( + Description='test_sg_desc', + GroupName='test_sg', + VpcId=vpc_id + ) + sg_id = resp['GroupId'] + + resp = iam_client.create_role( + RoleName='TestRole', + AssumeRolePolicyDocument='some_policy' + ) + iam_arn = resp['Role']['Arn'] + + return vpc_id, subnet_id, sg_id, iam_arn + + +@mock_cloudformation() +@mock_ec2 +@mock_ecs +@mock_iam +@mock_batch +def test_create_env_cf(): + ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients() + vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) + + create_environment_template = { + 'Resources': { + "ComputeEnvironment": { + "Type": "AWS::Batch::ComputeEnvironment", + "Properties": { + "Type": "MANAGED", + "ComputeResources": { + "Type": "EC2", + "MinvCpus": 0, + "DesiredvCpus": 0, + "MaxvCpus": 64, + "InstanceTypes": [ + "optimal" + ], + "Subnets": [subnet_id], + "SecurityGroupIds": [sg_id], + "InstanceRole": iam_arn + }, + "ServiceRole": iam_arn + } + } + } + } + cf_json = json.dumps(create_environment_template) + + cf_conn = boto3.client('cloudformation', DEFAULT_REGION) + stack_id = cf_conn.create_stack( + StackName='test_stack', + TemplateBody=cf_json, + )['StackId'] + + stack_resources = cf_conn.list_stack_resources(StackName=stack_id) + + stack_resources['StackResourceSummaries'][0]['ResourceStatus'].should.equal('CREATE_COMPLETE') + stack_resources['StackResourceSummaries'][0]['PhysicalResourceId'].startswith('arn:aws:batch:') + stack_resources['StackResourceSummaries'][0]['PhysicalResourceId'].should.contain('test_stack') From 9805a279c7077c74a34b4017e2938ef354c07998 Mon Sep 17 00:00:00 2001 From: Terry Cain Date: Fri, 20 Oct 2017 01:06:30 +0100 Subject: [PATCH 19/22] Added JobQueue to cloudformation --- moto/batch/models.py | 29 +++++++++++ tests/test_batch/test_cloudformation.py | 66 +++++++++++++++++++++++++ 2 files changed, 95 insertions(+) diff --git a/moto/batch/models.py b/moto/batch/models.py index 0fe3016ca..14572dd78 100644 --- a/moto/batch/models.py +++ b/moto/batch/models.py @@ -121,6 +121,35 @@ class JobQueue(BaseModel): return result + @property + def physical_resource_id(self): + return self.arn + + @classmethod + def create_from_cloudformation_json(cls, resource_name, cloudformation_json, region_name): + backend = batch_backends[region_name] + properties = cloudformation_json['Properties'] + + # Need to deal with difference case from cloudformation compute_resources, e.g. instanceRole vs InstanceRole + # Hacky fix to normalise keys, is making me think I want to start spamming cAsEiNsEnSiTiVe dictionaries + compute_envs = [] + for compute_env in properties['ComputeEnvironmentOrder']: + tmp_compute_env_order = {} + for key, value in compute_env.items(): + new_key = key[0].lower() + key[1:] + tmp_compute_env_order[new_key] = value + compute_envs.append(tmp_compute_env_order) + + queue = backend.create_job_queue( + queue_name=resource_name, + priority=properties['Priority'], + state=properties.get('State', 'ENABLED'), + compute_env_order=compute_envs + ) + arn = queue[1] + + return backend.get_job_queue_by_arn(arn) + class JobDefinition(BaseModel): def __init__(self, name, parameters, _type, container_properties, region_name, revision=0, retry_strategy=0): diff --git a/tests/test_batch/test_cloudformation.py b/tests/test_batch/test_cloudformation.py index b0203af93..bc9bd53e4 100644 --- a/tests/test_batch/test_cloudformation.py +++ b/tests/test_batch/test_cloudformation.py @@ -94,5 +94,71 @@ def test_create_env_cf(): stack_resources = cf_conn.list_stack_resources(StackName=stack_id) stack_resources['StackResourceSummaries'][0]['ResourceStatus'].should.equal('CREATE_COMPLETE') + # Spot checks on the ARN stack_resources['StackResourceSummaries'][0]['PhysicalResourceId'].startswith('arn:aws:batch:') stack_resources['StackResourceSummaries'][0]['PhysicalResourceId'].should.contain('test_stack') + + +@mock_cloudformation() +@mock_ec2 +@mock_ecs +@mock_iam +@mock_batch +def test_create_job_queue_cf(): + ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients() + vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) + + create_environment_template = { + 'Resources': { + "ComputeEnvironment": { + "Type": "AWS::Batch::ComputeEnvironment", + "Properties": { + "Type": "MANAGED", + "ComputeResources": { + "Type": "EC2", + "MinvCpus": 0, + "DesiredvCpus": 0, + "MaxvCpus": 64, + "InstanceTypes": [ + "optimal" + ], + "Subnets": [subnet_id], + "SecurityGroupIds": [sg_id], + "InstanceRole": iam_arn + }, + "ServiceRole": iam_arn + } + }, + + "JobQueue": { + "Type": "AWS::Batch::JobQueue", + "Properties": { + "Priority": 1, + "ComputeEnvironmentOrder": [ + { + "Order": 1, + "ComputeEnvironment": {"Ref": "ComputeEnvironment"} + } + ] + } + }, + } + } + cf_json = json.dumps(create_environment_template) + + cf_conn = boto3.client('cloudformation', DEFAULT_REGION) + stack_id = cf_conn.create_stack( + StackName='test_stack', + TemplateBody=cf_json, + )['StackId'] + + stack_resources = cf_conn.list_stack_resources(StackName=stack_id) + len(stack_resources['StackResourceSummaries']).should.equal(2) + + job_queue_resource = list(filter(lambda item: item['ResourceType'] == 'AWS::Batch::JobQueue', stack_resources['StackResourceSummaries']))[0] + + job_queue_resource['ResourceStatus'].should.equal('CREATE_COMPLETE') + # Spot checks on the ARN + job_queue_resource['PhysicalResourceId'].startswith('arn:aws:batch:') + job_queue_resource['PhysicalResourceId'].should.contain('test_stack') + job_queue_resource['PhysicalResourceId'].should.contain('job-queue/') From 629503398c097c17f1ba83d3c842d86638eaffda Mon Sep 17 00:00:00 2001 From: Terry Cain Date: Fri, 20 Oct 2017 19:10:31 +0100 Subject: [PATCH 20/22] Added JobDefinition to cloudformation --- moto/batch/models.py | 40 +++++++----- moto/batch/utils.py | 9 +++ tests/test_batch/test_cloudformation.py | 83 +++++++++++++++++++++++++ 3 files changed, 116 insertions(+), 16 deletions(-) diff --git a/moto/batch/models.py b/moto/batch/models.py index 14572dd78..8b3b81ccb 100644 --- a/moto/batch/models.py +++ b/moto/batch/models.py @@ -19,7 +19,7 @@ from moto.ecs import ecs_backends from moto.logs import logs_backends from .exceptions import InvalidParameterValueException, InternalFailure, ClientException -from .utils import make_arn_for_compute_env, make_arn_for_job_queue, make_arn_for_task_def +from .utils import make_arn_for_compute_env, make_arn_for_job_queue, make_arn_for_task_def, lowercase_first_key from moto.ec2.exceptions import InvalidSubnetIdError from moto.ec2.models import INSTANCE_TYPES as EC2_INSTANCE_TYPES from moto.iam.exceptions import IAMNotFoundException @@ -64,18 +64,11 @@ class ComputeEnvironment(BaseModel): backend = batch_backends[region_name] properties = cloudformation_json['Properties'] - # Need to deal with difference case from cloudformation compute_resources, e.g. instanceRole vs InstanceRole - # Hacky fix to normalise keys - new_comp_res = {} - for key, value in properties['ComputeResources'].items(): - new_key = key[0].lower() + key[1:] - new_comp_res[new_key] = value - env = backend.create_compute_environment( resource_name, properties['Type'], properties.get('State', 'ENABLED'), - new_comp_res, + lowercase_first_key(properties['ComputeResources']), properties['ServiceRole'] ) arn = env[1] @@ -132,13 +125,7 @@ class JobQueue(BaseModel): # Need to deal with difference case from cloudformation compute_resources, e.g. instanceRole vs InstanceRole # Hacky fix to normalise keys, is making me think I want to start spamming cAsEiNsEnSiTiVe dictionaries - compute_envs = [] - for compute_env in properties['ComputeEnvironmentOrder']: - tmp_compute_env_order = {} - for key, value in compute_env.items(): - new_key = key[0].lower() + key[1:] - tmp_compute_env_order[new_key] = value - compute_envs.append(tmp_compute_env_order) + compute_envs = [lowercase_first_key(dict_item) for dict_item in properties['ComputeEnvironmentOrder']] queue = backend.create_job_queue( queue_name=resource_name, @@ -228,6 +215,27 @@ class JobDefinition(BaseModel): return result + @property + def physical_resource_id(self): + return self.arn + + @classmethod + def create_from_cloudformation_json(cls, resource_name, cloudformation_json, region_name): + backend = batch_backends[region_name] + properties = cloudformation_json['Properties'] + + res = backend.register_job_definition( + def_name=resource_name, + parameters=lowercase_first_key(properties.get('Parameters', {})), + _type='container', + retry_strategy=lowercase_first_key(properties['RetryStrategy']), + container_properties=lowercase_first_key(properties['ContainerProperties']) + ) + + arn = res[1] + + return backend.get_job_definition_by_arn(arn) + class Job(threading.Thread, BaseModel): def __init__(self, name, job_def, job_queue, log_backend): diff --git a/moto/batch/utils.py b/moto/batch/utils.py index 6cdd381f7..829a55f12 100644 --- a/moto/batch/utils.py +++ b/moto/batch/utils.py @@ -11,3 +11,12 @@ def make_arn_for_job_queue(account_id, name, region_name): def make_arn_for_task_def(account_id, name, revision, region_name): return "arn:aws:batch:{0}:{1}:job-definition/{2}:{3}".format(region_name, account_id, name, revision) + + +def lowercase_first_key(some_dict): + new_dict = {} + for key, value in some_dict.items(): + new_key = key[0].lower() + key[1:] + new_dict[new_key] = value + + return new_dict diff --git a/tests/test_batch/test_cloudformation.py b/tests/test_batch/test_cloudformation.py index bc9bd53e4..1e37aa3a6 100644 --- a/tests/test_batch/test_cloudformation.py +++ b/tests/test_batch/test_cloudformation.py @@ -162,3 +162,86 @@ def test_create_job_queue_cf(): job_queue_resource['PhysicalResourceId'].startswith('arn:aws:batch:') job_queue_resource['PhysicalResourceId'].should.contain('test_stack') job_queue_resource['PhysicalResourceId'].should.contain('job-queue/') + + +@mock_cloudformation() +@mock_ec2 +@mock_ecs +@mock_iam +@mock_batch +def test_create_job_def_cf(): + ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients() + vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) + + create_environment_template = { + 'Resources': { + "ComputeEnvironment": { + "Type": "AWS::Batch::ComputeEnvironment", + "Properties": { + "Type": "MANAGED", + "ComputeResources": { + "Type": "EC2", + "MinvCpus": 0, + "DesiredvCpus": 0, + "MaxvCpus": 64, + "InstanceTypes": [ + "optimal" + ], + "Subnets": [subnet_id], + "SecurityGroupIds": [sg_id], + "InstanceRole": iam_arn + }, + "ServiceRole": iam_arn + } + }, + + "JobQueue": { + "Type": "AWS::Batch::JobQueue", + "Properties": { + "Priority": 1, + "ComputeEnvironmentOrder": [ + { + "Order": 1, + "ComputeEnvironment": {"Ref": "ComputeEnvironment"} + } + ] + } + }, + + "JobDefinition": { + "Type": "AWS::Batch::JobDefinition", + "Properties": { + "Type": "container", + "ContainerProperties": { + "Image": { + "Fn::Join": ["", ["137112412989.dkr.ecr.", {"Ref": "AWS::Region"}, ".amazonaws.com/amazonlinux:latest"]] + }, + "Vcpus": 2, + "Memory": 2000, + "Command": ["echo", "Hello world"] + }, + "RetryStrategy": { + "Attempts": 1 + } + } + }, + } + } + cf_json = json.dumps(create_environment_template) + + cf_conn = boto3.client('cloudformation', DEFAULT_REGION) + stack_id = cf_conn.create_stack( + StackName='test_stack', + TemplateBody=cf_json, + )['StackId'] + + stack_resources = cf_conn.list_stack_resources(StackName=stack_id) + len(stack_resources['StackResourceSummaries']).should.equal(3) + + job_def_resource = list(filter(lambda item: item['ResourceType'] == 'AWS::Batch::JobDefinition', stack_resources['StackResourceSummaries']))[0] + + job_def_resource['ResourceStatus'].should.equal('CREATE_COMPLETE') + # Spot checks on the ARN + job_def_resource['PhysicalResourceId'].startswith('arn:aws:batch:') + job_def_resource['PhysicalResourceId'].should.contain('test_stack-JobDef') + job_def_resource['PhysicalResourceId'].should.contain('job-definition/') From ca3a3633e9a1448e20f11ae026460a468fe9c0b6 Mon Sep 17 00:00:00 2001 From: Terry Cain Date: Sun, 22 Oct 2017 13:36:23 +0100 Subject: [PATCH 21/22] Called terminatejob from canceljob --- moto/batch/responses.py | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/moto/batch/responses.py b/moto/batch/responses.py index 96094068d..e626b7d4c 100644 --- a/moto/batch/responses.py +++ b/moto/batch/responses.py @@ -293,12 +293,4 @@ class BatchResponse(BaseResponse): # CancelJob def canceljob(self): # Theres some AWS semantics on the differences but for us they're identical ;-) - job_id = self._get_param('jobId') - reason = self._get_param('reason') - - try: - self.batch_backend.terminate_job(job_id, reason) - except AWSError as err: - return err.response() - - return '' + return self.terminatejob() From fbc984933bce2caeb3855d8f0a747b6a447ac0da Mon Sep 17 00:00:00 2001 From: Terry Cain Date: Sun, 22 Oct 2017 21:36:39 +0100 Subject: [PATCH 22/22] Added server test --- tests/test_batch/test_server.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/test_batch/test_server.py b/tests/test_batch/test_server.py index 7c0d2b3a1..4a74260a8 100644 --- a/tests/test_batch/test_server.py +++ b/tests/test_batch/test_server.py @@ -9,8 +9,11 @@ from moto import mock_batch Test the different server responses ''' + @mock_batch def test_batch_list(): backend = server.create_backend_app("batch") test_client = backend.test_client() - # do test \ No newline at end of file + + res = test_client.get('/v1/describecomputeenvironments') + res.status_code.should.equal(200)