diff --git a/moto/__init__.py b/moto/__init__.py index 79efac862..0c0358324 100644 --- a/moto/__init__.py +++ b/moto/__init__.py @@ -40,6 +40,7 @@ from .route53 import mock_route53, mock_route53_deprecated # flake8: noqa from .swf import mock_swf, mock_swf_deprecated # flake8: noqa from .xray import mock_xray, mock_xray_client, XRaySegment # flake8: noqa from .logs import mock_logs, mock_logs_deprecated # flake8: noqa +from .batch import mock_batch # flake8: noqa try: diff --git a/moto/backends.py b/moto/backends.py index 24a8b6c2b..d1ce0730e 100644 --- a/moto/backends.py +++ b/moto/backends.py @@ -35,11 +35,13 @@ from moto.sqs import sqs_backends from moto.ssm import ssm_backends from moto.sts import sts_backends from moto.xray import xray_backends +from moto.batch import batch_backends BACKENDS = { 'acm': acm_backends, 'apigateway': apigateway_backends, 'autoscaling': autoscaling_backends, + 'batch': batch_backends, 'cloudformation': cloudformation_backends, 'cloudwatch': cloudwatch_backends, 'datapipeline': datapipeline_backends, diff --git a/moto/batch/__init__.py b/moto/batch/__init__.py new file mode 100644 index 000000000..6002b6fc7 --- /dev/null +++ b/moto/batch/__init__.py @@ -0,0 +1,6 @@ +from __future__ import unicode_literals +from .models import batch_backends +from ..core.models import base_decorator + +batch_backend = batch_backends['us-east-1'] +mock_batch = base_decorator(batch_backends) diff --git a/moto/batch/exceptions.py b/moto/batch/exceptions.py new file mode 100644 index 000000000..a71e54ce3 --- /dev/null +++ b/moto/batch/exceptions.py @@ -0,0 +1,37 @@ +from __future__ import unicode_literals +import json + + +class AWSError(Exception): + CODE = None + STATUS = 400 + + def __init__(self, message, code=None, status=None): + self.message = message + self.code = code if code is not None else self.CODE + self.status = status if status is not None else self.STATUS + + def response(self): + return json.dumps({'__type': self.code, 'message': self.message}), dict(status=self.status) + + +class InvalidRequestException(AWSError): + CODE = 'InvalidRequestException' + + +class InvalidParameterValueException(AWSError): + CODE = 'InvalidParameterValue' + + +class ValidationError(AWSError): + CODE = 'ValidationError' + + +class InternalFailure(AWSError): + CODE = 'InternalFailure' + STATUS = 500 + + +class ClientException(AWSError): + CODE = 'ClientException' + STATUS = 400 diff --git a/moto/batch/models.py b/moto/batch/models.py new file mode 100644 index 000000000..8b3b81ccb --- /dev/null +++ b/moto/batch/models.py @@ -0,0 +1,1042 @@ +from __future__ import unicode_literals +import boto3 +import re +import requests.adapters +from itertools import cycle +import six +import datetime +import time +import uuid +import logging +import docker +import functools +import threading +import dateutil.parser +from moto.core import BaseBackend, BaseModel +from moto.iam import iam_backends +from moto.ec2 import ec2_backends +from moto.ecs import ecs_backends +from moto.logs import logs_backends + +from .exceptions import InvalidParameterValueException, InternalFailure, ClientException +from .utils import make_arn_for_compute_env, make_arn_for_job_queue, make_arn_for_task_def, lowercase_first_key +from moto.ec2.exceptions import InvalidSubnetIdError +from moto.ec2.models import INSTANCE_TYPES as EC2_INSTANCE_TYPES +from moto.iam.exceptions import IAMNotFoundException + + +_orig_adapter_send = requests.adapters.HTTPAdapter.send +logger = logging.getLogger(__name__) +DEFAULT_ACCOUNT_ID = 123456789012 +COMPUTE_ENVIRONMENT_NAME_REGEX = re.compile(r'^[A-Za-z0-9][A-Za-z0-9_-]{1,126}[A-Za-z0-9]$') + + +def datetime2int(date): + return int(time.mktime(date.timetuple())) + + +class ComputeEnvironment(BaseModel): + def __init__(self, compute_environment_name, _type, state, compute_resources, service_role, region_name): + self.name = compute_environment_name + self.env_type = _type + self.state = state + self.compute_resources = compute_resources + self.service_role = service_role + self.arn = make_arn_for_compute_env(DEFAULT_ACCOUNT_ID, compute_environment_name, region_name) + + self.instances = [] + self.ecs_arn = None + self.ecs_name = None + + def add_instance(self, instance): + self.instances.append(instance) + + def set_ecs(self, arn, name): + self.ecs_arn = arn + self.ecs_name = name + + @property + def physical_resource_id(self): + return self.arn + + @classmethod + def create_from_cloudformation_json(cls, resource_name, cloudformation_json, region_name): + backend = batch_backends[region_name] + properties = cloudformation_json['Properties'] + + env = backend.create_compute_environment( + resource_name, + properties['Type'], + properties.get('State', 'ENABLED'), + lowercase_first_key(properties['ComputeResources']), + properties['ServiceRole'] + ) + arn = env[1] + + return backend.get_compute_environment_by_arn(arn) + + +class JobQueue(BaseModel): + def __init__(self, name, priority, state, environments, env_order_json, region_name): + """ + :param name: Job queue name + :type name: str + :param priority: Job queue priority + :type priority: int + :param state: Either ENABLED or DISABLED + :type state: str + :param environments: Compute Environments + :type environments: list of ComputeEnvironment + :param env_order_json: Compute Environments JSON for use when describing + :type env_order_json: list of dict + :param region_name: Region name + :type region_name: str + """ + self.name = name + self.priority = priority + self.state = state + self.environments = environments + self.env_order_json = env_order_json + self.arn = make_arn_for_job_queue(DEFAULT_ACCOUNT_ID, name, region_name) + self.status = 'VALID' + + self.jobs = [] + + def describe(self): + result = { + 'computeEnvironmentOrder': self.env_order_json, + 'jobQueueArn': self.arn, + 'jobQueueName': self.name, + 'priority': self.priority, + 'state': self.state, + 'status': self.status + } + + return result + + @property + def physical_resource_id(self): + return self.arn + + @classmethod + def create_from_cloudformation_json(cls, resource_name, cloudformation_json, region_name): + backend = batch_backends[region_name] + properties = cloudformation_json['Properties'] + + # Need to deal with difference case from cloudformation compute_resources, e.g. instanceRole vs InstanceRole + # Hacky fix to normalise keys, is making me think I want to start spamming cAsEiNsEnSiTiVe dictionaries + compute_envs = [lowercase_first_key(dict_item) for dict_item in properties['ComputeEnvironmentOrder']] + + queue = backend.create_job_queue( + queue_name=resource_name, + priority=properties['Priority'], + state=properties.get('State', 'ENABLED'), + compute_env_order=compute_envs + ) + arn = queue[1] + + return backend.get_job_queue_by_arn(arn) + + +class JobDefinition(BaseModel): + def __init__(self, name, parameters, _type, container_properties, region_name, revision=0, retry_strategy=0): + self.name = name + self.retries = retry_strategy + self.type = _type + self.revision = revision + self._region = region_name + self.container_properties = container_properties + self.arn = None + self.status = 'INACTIVE' + + if parameters is None: + parameters = {} + self.parameters = parameters + + self._validate() + self._update_arn() + + def _update_arn(self): + self.revision += 1 + self.arn = make_arn_for_task_def(DEFAULT_ACCOUNT_ID, self.name, self.revision, self._region) + + def _validate(self): + if self.type not in ('container',): + raise ClientException('type must be one of "container"') + + # For future use when containers arnt the only thing in batch + if self.type != 'container': + raise NotImplementedError() + + if not isinstance(self.parameters, dict): + raise ClientException('parameters must be a string to string map') + + if 'image' not in self.container_properties: + raise ClientException('containerProperties must contain image') + + if 'memory' not in self.container_properties: + raise ClientException('containerProperties must contain memory') + if self.container_properties['memory'] < 4: + raise ClientException('container memory limit must be greater than 4') + + if 'vcpus' not in self.container_properties: + raise ClientException('containerProperties must contain vcpus') + if self.container_properties['vcpus'] < 1: + raise ClientException('container vcpus limit must be greater than 0') + + def update(self, parameters, _type, container_properties, retry_strategy): + if parameters is None: + parameters = self.parameters + + if _type is None: + _type = self.type + + if container_properties is None: + container_properties = self.container_properties + + if retry_strategy is None: + retry_strategy = self.retries + + return JobDefinition(self.name, parameters, _type, container_properties, region_name=self._region, revision=self.revision, retry_strategy=retry_strategy) + + def describe(self): + result = { + 'jobDefinitionArn': self.arn, + 'jobDefinitionName': self.name, + 'parameters': self.parameters, + 'revision': self.revision, + 'status': self.status, + 'type': self.type + } + if self.container_properties is not None: + result['containerProperties'] = self.container_properties + if self.retries is not None and self.retries > 0: + result['retryStrategy'] = {'attempts': self.retries} + + return result + + @property + def physical_resource_id(self): + return self.arn + + @classmethod + def create_from_cloudformation_json(cls, resource_name, cloudformation_json, region_name): + backend = batch_backends[region_name] + properties = cloudformation_json['Properties'] + + res = backend.register_job_definition( + def_name=resource_name, + parameters=lowercase_first_key(properties.get('Parameters', {})), + _type='container', + retry_strategy=lowercase_first_key(properties['RetryStrategy']), + container_properties=lowercase_first_key(properties['ContainerProperties']) + ) + + arn = res[1] + + return backend.get_job_definition_by_arn(arn) + + +class Job(threading.Thread, BaseModel): + def __init__(self, name, job_def, job_queue, log_backend): + """ + Docker Job + + :param name: Job Name + :param job_def: Job definition + :type: job_def: JobDefinition + :param job_queue: Job Queue + :param log_backend: Log backend + :type log_backend: moto.logs.models.LogsBackend + """ + threading.Thread.__init__(self) + + self.job_name = name + self.job_id = str(uuid.uuid4()) + self.job_definition = job_def + self.job_queue = job_queue + self.job_state = 'SUBMITTED' # One of SUBMITTED | PENDING | RUNNABLE | STARTING | RUNNING | SUCCEEDED | FAILED + self.job_queue.jobs.append(self) + self.job_started_at = datetime.datetime(1970, 1, 1) + self.job_stopped_at = datetime.datetime(1970, 1, 1) + self.job_stopped = False + self.job_stopped_reason = None + + self.stop = False + + self.daemon = True + self.name = 'MOTO-BATCH-' + self.job_id + + self.docker_client = docker.from_env() + self._log_backend = log_backend + + # Unfortunately mocking replaces this method w/o fallback enabled, so we + # need to replace it if we detect it's been mocked + if requests.adapters.HTTPAdapter.send != _orig_adapter_send: + _orig_get_adapter = self.docker_client.api.get_adapter + + def replace_adapter_send(*args, **kwargs): + adapter = _orig_get_adapter(*args, **kwargs) + + if isinstance(adapter, requests.adapters.HTTPAdapter): + adapter.send = functools.partial(_orig_adapter_send, adapter) + return adapter + self.docker_client.api.get_adapter = replace_adapter_send + + def describe(self): + result = { + 'jobDefinition': self.job_definition.arn, + 'jobId': self.job_id, + 'jobName': self.job_name, + 'jobQueue': self.job_queue.arn, + 'startedAt': datetime2int(self.job_started_at), + 'status': self.job_state, + 'dependsOn': [] + } + if self.job_stopped: + result['stoppedAt'] = datetime2int(self.job_stopped_at) + if self.job_stopped_reason is not None: + result['statusReason'] = self.job_stopped_reason + return result + + def run(self): + """ + Run the container. + + Logic is as follows: + Generate container info (eventually from task definition) + Start container + Loop whilst not asked to stop and the container is running. + Get all logs from container between the last time I checked and now. + Convert logs into cloudwatch format + Put logs into cloudwatch + + :return: + """ + try: + self.job_state = 'PENDING' + time.sleep(1) + + image = 'alpine:latest' + cmd = '/bin/sh -c "for a in `seq 1 10`; do echo Hello World; sleep 1; done"' + name = '{0}-{1}'.format(self.job_name, self.job_id) + + self.job_state = 'RUNNABLE' + # TODO setup ecs container instance + time.sleep(1) + + self.job_state = 'STARTING' + container = self.docker_client.containers.run( + image, cmd, + detach=True, + name=name + ) + self.job_state = 'RUNNING' + self.job_started_at = datetime.datetime.now() + try: + # Log collection + logs_stdout = [] + logs_stderr = [] + container.reload() + + # Dodgy hack, we can only check docker logs once a second, but we want to loop more + # so we can stop if asked to in a quick manner, should all go away if we go async + # There also be some dodgyness when sending an integer to docker logs and some + # events seem to be duplicated. + now = datetime.datetime.now() + i = 1 + while container.status == 'running' and not self.stop: + time.sleep(0.15) + if i % 10 == 0: + logs_stderr.extend(container.logs(stdout=False, stderr=True, timestamps=True, since=datetime2int(now)).decode().split('\n')) + logs_stdout.extend(container.logs(stdout=True, stderr=False, timestamps=True, since=datetime2int(now)).decode().split('\n')) + now = datetime.datetime.now() + container.reload() + i += 1 + + # Container should be stopped by this point... unless asked to stop + if container.status == 'running': + container.kill() + + self.job_stopped_at = datetime.datetime.now() + # Get final logs + logs_stderr.extend(container.logs(stdout=False, stderr=True, timestamps=True, since=datetime2int(now)).decode().split('\n')) + logs_stdout.extend(container.logs(stdout=True, stderr=False, timestamps=True, since=datetime2int(now)).decode().split('\n')) + + self.job_state = 'SUCCEEDED' if not self.stop else 'FAILED' + + # Process logs + logs_stdout = [x for x in logs_stdout if len(x) > 0] + logs_stderr = [x for x in logs_stderr if len(x) > 0] + logs = [] + for line in logs_stdout + logs_stderr: + date, line = line.split(' ', 1) + date = dateutil.parser.parse(date) + date = int(date.timestamp()) + logs.append({'timestamp': date, 'message': line.strip()}) + + # Send to cloudwatch + log_group = '/aws/batch/job' + stream_name = '{0}/default/{1}'.format(self.job_definition.name, self.job_id) + self._log_backend.ensure_log_group(log_group, None) + self._log_backend.create_log_stream(log_group, stream_name) + self._log_backend.put_log_events(log_group, stream_name, logs, None) + + except Exception as err: + logger.error('Failed to run AWS Batch container {0}. Error {1}'.format(self.name, err)) + self.job_state = 'FAILED' + container.kill() + finally: + container.remove() + except Exception as err: + logger.error('Failed to run AWS Batch container {0}. Error {1}'.format(self.name, err)) + self.job_state = 'FAILED' + + self.job_stopped = True + self.job_stopped_at = datetime.datetime.now() + + def terminate(self, reason): + if not self.stop: + self.stop = True + self.job_stopped_reason = reason + + +class BatchBackend(BaseBackend): + def __init__(self, region_name=None): + super(BatchBackend, self).__init__() + self.region_name = region_name + + self._compute_environments = {} + self._job_queues = {} + self._job_definitions = {} + self._jobs = {} + + @property + def iam_backend(self): + """ + :return: IAM Backend + :rtype: moto.iam.models.IAMBackend + """ + return iam_backends['global'] + + @property + def ec2_backend(self): + """ + :return: EC2 Backend + :rtype: moto.ec2.models.EC2Backend + """ + return ec2_backends[self.region_name] + + @property + def ecs_backend(self): + """ + :return: ECS Backend + :rtype: moto.ecs.models.EC2ContainerServiceBackend + """ + return ecs_backends[self.region_name] + + @property + def logs_backend(self): + """ + :return: ECS Backend + :rtype: moto.logs.models.LogsBackend + """ + return logs_backends[self.region_name] + + def reset(self): + region_name = self.region_name + + for job in self._jobs.values(): + if job.job_state not in ('FAILED', 'SUCCEEDED'): + job.stop = True + # Try to join + job.join(0.2) + + self.__dict__ = {} + self.__init__(region_name) + + def get_compute_environment_by_arn(self, arn): + return self._compute_environments.get(arn) + + def get_compute_environment_by_name(self, name): + for comp_env in self._compute_environments.values(): + if comp_env.name == name: + return comp_env + return None + + def get_compute_environment(self, identifier): + """ + Get compute environment by name or ARN + :param identifier: Name or ARN + :type identifier: str + + :return: Compute Environment or None + :rtype: ComputeEnvironment or None + """ + env = self.get_compute_environment_by_arn(identifier) + if env is None: + env = self.get_compute_environment_by_name(identifier) + return env + + def get_job_queue_by_arn(self, arn): + return self._job_queues.get(arn) + + def get_job_queue_by_name(self, name): + for comp_env in self._job_queues.values(): + if comp_env.name == name: + return comp_env + return None + + def get_job_queue(self, identifier): + """ + Get job queue by name or ARN + :param identifier: Name or ARN + :type identifier: str + + :return: Job Queue or None + :rtype: JobQueue or None + """ + env = self.get_job_queue_by_arn(identifier) + if env is None: + env = self.get_job_queue_by_name(identifier) + return env + + def get_job_definition_by_arn(self, arn): + return self._job_definitions.get(arn) + + def get_job_definition_by_name(self, name): + for comp_env in self._job_definitions.values(): + if comp_env.name == name: + return comp_env + return None + + def get_job_definition_by_name_revision(self, name, revision): + for job_def in self._job_definitions.values(): + if job_def.name == name and job_def.revision == revision: + return job_def + return None + + def get_job_definition(self, identifier): + """ + Get job defintiion by name or ARN + :param identifier: Name or ARN + :type identifier: str + + :return: Job definition or None + :rtype: JobDefinition or None + """ + env = self.get_job_definition_by_arn(identifier) + if env is None: + env = self.get_job_definition_by_name(identifier) + return env + + def get_job_definitions(self, identifier): + """ + Get job defintiion by name or ARN + :param identifier: Name or ARN + :type identifier: str + + :return: Job definition or None + :rtype: list of JobDefinition + """ + result = [] + env = self.get_job_definition_by_arn(identifier) + if env is not None: + result.append(env) + else: + for value in self._job_definitions.values(): + if value.name == identifier: + result.append(value) + + return result + + def get_job_by_id(self, identifier): + """ + Get job by id + :param identifier: Job ID + :type identifier: str + + :return: Job + :rtype: Job + """ + try: + return self._jobs[identifier] + except KeyError: + return None + + def describe_compute_environments(self, environments=None, max_results=None, next_token=None): + envs = set() + if environments is not None: + envs = set(environments) + + result = [] + for arn, environment in self._compute_environments.items(): + # Filter shortcut + if len(envs) > 0 and arn not in envs and environment.name not in envs: + continue + + json_part = { + 'computeEnvironmentArn': arn, + 'computeEnvironmentName': environment.name, + 'ecsClusterArn': environment.ecs_arn, + 'serviceRole': environment.service_role, + 'state': environment.state, + 'type': environment.env_type, + 'status': 'VALID' + } + if environment.env_type == 'MANAGED': + json_part['computeResources'] = environment.compute_resources + + result.append(json_part) + + return result + + def create_compute_environment(self, compute_environment_name, _type, state, compute_resources, service_role): + # Validate + if COMPUTE_ENVIRONMENT_NAME_REGEX.match(compute_environment_name) is None: + raise InvalidParameterValueException('Compute environment name does not match ^[A-Za-z0-9][A-Za-z0-9_-]{1,126}[A-Za-z0-9]$') + + if self.get_compute_environment_by_name(compute_environment_name) is not None: + raise InvalidParameterValueException('A compute environment already exists with the name {0}'.format(compute_environment_name)) + + # Look for IAM role + try: + self.iam_backend.get_role_by_arn(service_role) + except IAMNotFoundException: + raise InvalidParameterValueException('Could not find IAM role {0}'.format(service_role)) + + if _type not in ('MANAGED', 'UNMANAGED'): + raise InvalidParameterValueException('type {0} must be one of MANAGED | UNMANAGED'.format(service_role)) + + if state is not None and state not in ('ENABLED', 'DISABLED'): + raise InvalidParameterValueException('state {0} must be one of ENABLED | DISABLED'.format(state)) + + if compute_resources is None and _type == 'MANAGED': + raise InvalidParameterValueException('computeResources must be specified when creating a MANAGED environment'.format(state)) + elif compute_resources is not None: + self._validate_compute_resources(compute_resources) + + # By here, all values except SPOT ones have been validated + new_comp_env = ComputeEnvironment( + compute_environment_name, _type, state, + compute_resources, service_role, + region_name=self.region_name + ) + self._compute_environments[new_comp_env.arn] = new_comp_env + + # Ok by this point, everything is legit, so if its Managed then start some instances + if _type == 'MANAGED': + cpus = int(compute_resources.get('desiredvCpus', compute_resources['minvCpus'])) + instance_types = compute_resources['instanceTypes'] + needed_instance_types = self.find_min_instances_to_meet_vcpus(instance_types, cpus) + # Create instances + + # Will loop over and over so we get decent subnet coverage + subnet_cycle = cycle(compute_resources['subnets']) + + for instance_type in needed_instance_types: + reservation = self.ec2_backend.add_instances( + image_id='ami-ecs-optimised', # Todo import AMIs + count=1, + user_data=None, + security_group_names=[], + instance_type=instance_type, + region_name=self.region_name, + subnet_id=six.next(subnet_cycle), + key_name=compute_resources.get('ec2KeyPair', 'AWS_OWNED'), + security_group_ids=compute_resources['securityGroupIds'] + ) + + new_comp_env.add_instance(reservation.instances[0]) + + # Create ECS cluster + # Should be of format P2OnDemand_Batch_UUID + cluster_name = 'OnDemand_Batch_' + str(uuid.uuid4()) + ecs_cluster = self.ecs_backend.create_cluster(cluster_name) + new_comp_env.set_ecs(ecs_cluster.arn, cluster_name) + + return compute_environment_name, new_comp_env.arn + + def _validate_compute_resources(self, cr): + """ + Checks contents of sub dictionary for managed clusters + + :param cr: computeResources + :type cr: dict + """ + for param in ('instanceRole', 'maxvCpus', 'minvCpus', 'instanceTypes', 'securityGroupIds', 'subnets', 'type'): + if param not in cr: + raise InvalidParameterValueException('computeResources must contain {0}'.format(param)) + + if self.iam_backend.get_role_by_arn(cr['instanceRole']) is None: + raise InvalidParameterValueException('could not find instanceRole {0}'.format(cr['instanceRole'])) + + if cr['maxvCpus'] < 0: + raise InvalidParameterValueException('maxVCpus must be positive') + if cr['minvCpus'] < 0: + raise InvalidParameterValueException('minVCpus must be positive') + if cr['maxvCpus'] < cr['minvCpus']: + raise InvalidParameterValueException('maxVCpus must be greater than minvCpus') + + if len(cr['instanceTypes']) == 0: + raise InvalidParameterValueException('At least 1 instance type must be provided') + for instance_type in cr['instanceTypes']: + if instance_type == 'optimal': + pass # Optimal should pick from latest of current gen + elif instance_type not in EC2_INSTANCE_TYPES: + raise InvalidParameterValueException('Instance type {0} does not exist'.format(instance_type)) + + for sec_id in cr['securityGroupIds']: + if self.ec2_backend.get_security_group_from_id(sec_id) is None: + raise InvalidParameterValueException('security group {0} does not exist'.format(sec_id)) + if len(cr['securityGroupIds']) == 0: + raise InvalidParameterValueException('At least 1 security group must be provided') + + for subnet_id in cr['subnets']: + try: + self.ec2_backend.get_subnet(subnet_id) + except InvalidSubnetIdError: + raise InvalidParameterValueException('subnet {0} does not exist'.format(subnet_id)) + if len(cr['subnets']) == 0: + raise InvalidParameterValueException('At least 1 subnet must be provided') + + if cr['type'] not in ('EC2', 'SPOT'): + raise InvalidParameterValueException('computeResources.type must be either EC2 | SPOT') + + if cr['type'] == 'SPOT': + raise InternalFailure('SPOT NOT SUPPORTED YET') + + @staticmethod + def find_min_instances_to_meet_vcpus(instance_types, target): + """ + Finds the minimum needed instances to meed a vcpu target + + :param instance_types: Instance types, like ['t2.medium', 't2.small'] + :type instance_types: list of str + :param target: VCPU target + :type target: float + :return: List of instance types + :rtype: list of str + """ + # vcpus = [ (vcpus, instance_type), (vcpus, instance_type), ... ] + instance_vcpus = [] + instances = [] + + for instance_type in instance_types: + if instance_type == 'optimal': + instance_type = 'm4.4xlarge' + + instance_vcpus.append( + (EC2_INSTANCE_TYPES[instance_type]['vcpus'], instance_type) + ) + + instance_vcpus = sorted(instance_vcpus, key=lambda item: item[0], reverse=True) + # Loop through, + # if biggest instance type smaller than target, and len(instance_types)> 1, then use biggest type + # if biggest instance type bigger than target, and len(instance_types)> 1, then remove it and move on + + # if biggest instance type bigger than target and len(instan_types) == 1 then add instance and finish + # if biggest instance type smaller than target and len(instan_types) == 1 then loop adding instances until target == 0 + # ^^ boils down to keep adding last till target vcpus is negative + # #Algorithm ;-) ... Could probably be done better with some quality lambdas + while target > 0: + current_vcpu, current_instance = instance_vcpus[0] + + if len(instance_vcpus) > 1: + if current_vcpu <= target: + target -= current_vcpu + instances.append(current_instance) + else: + # try next biggest instance + instance_vcpus.pop(0) + else: + # Were on the last instance + target -= current_vcpu + instances.append(current_instance) + + return instances + + def delete_compute_environment(self, compute_environment_name): + if compute_environment_name is None: + raise InvalidParameterValueException('Missing computeEnvironment parameter') + + compute_env = self.get_compute_environment(compute_environment_name) + + if compute_env is not None: + # Pop ComputeEnvironment + self._compute_environments.pop(compute_env.arn) + + # Delete ECS cluster + self.ecs_backend.delete_cluster(compute_env.ecs_name) + + if compute_env.env_type == 'MANAGED': + # Delete compute envrionment + instance_ids = [instance.id for instance in compute_env.instances] + self.ec2_backend.terminate_instances(instance_ids) + + def update_compute_environment(self, compute_environment_name, state, compute_resources, service_role): + # Validate + compute_env = self.get_compute_environment(compute_environment_name) + if compute_env is None: + raise ClientException('Compute environment {0} does not exist') + + # Look for IAM role + if service_role is not None: + try: + role = self.iam_backend.get_role_by_arn(service_role) + except IAMNotFoundException: + raise InvalidParameterValueException('Could not find IAM role {0}'.format(service_role)) + + compute_env.service_role = role + + if state is not None: + if state not in ('ENABLED', 'DISABLED'): + raise InvalidParameterValueException('state {0} must be one of ENABLED | DISABLED'.format(state)) + + compute_env.state = state + + if compute_resources is not None: + # TODO Implement resizing of instances based on changing vCpus + # compute_resources CAN contain desiredvCpus, maxvCpus, minvCpus, and can contain none of them. + pass + + return compute_env.name, compute_env.arn + + def create_job_queue(self, queue_name, priority, state, compute_env_order): + """ + Create a job queue + + :param queue_name: Queue name + :type queue_name: str + :param priority: Queue priority + :type priority: int + :param state: Queue state + :type state: string + :param compute_env_order: Compute environment list + :type compute_env_order: list of dict + :return: Tuple of Name, ARN + :rtype: tuple of str + """ + for variable, var_name in ((queue_name, 'jobQueueName'), (priority, 'priority'), (state, 'state'), (compute_env_order, 'computeEnvironmentOrder')): + if variable is None: + raise ClientException('{0} must be provided'.format(var_name)) + + if state not in ('ENABLED', 'DISABLED'): + raise ClientException('state {0} must be one of ENABLED | DISABLED'.format(state)) + if self.get_job_queue_by_name(queue_name) is not None: + raise ClientException('Job queue {0} already exists'.format(queue_name)) + + if len(compute_env_order) == 0: + raise ClientException('At least 1 compute environment must be provided') + try: + # orders and extracts computeEnvironment names + ordered_compute_environments = [item['computeEnvironment'] for item in sorted(compute_env_order, key=lambda x: x['order'])] + env_objects = [] + # Check each ARN exists, then make a list of compute env's + for arn in ordered_compute_environments: + env = self.get_compute_environment_by_arn(arn) + if env is None: + raise ClientException('Compute environment {0} does not exist'.format(arn)) + env_objects.append(env) + except Exception: + raise ClientException('computeEnvironmentOrder is malformed') + + # Create new Job Queue + queue = JobQueue(queue_name, priority, state, env_objects, compute_env_order, self.region_name) + self._job_queues[queue.arn] = queue + + return queue_name, queue.arn + + def describe_job_queues(self, job_queues=None, max_results=None, next_token=None): + envs = set() + if job_queues is not None: + envs = set(job_queues) + + result = [] + for arn, job_queue in self._job_queues.items(): + # Filter shortcut + if len(envs) > 0 and arn not in envs and job_queue.name not in envs: + continue + + result.append(job_queue.describe()) + + return result + + def update_job_queue(self, queue_name, priority, state, compute_env_order): + """ + Update a job queue + + :param queue_name: Queue name + :type queue_name: str + :param priority: Queue priority + :type priority: int + :param state: Queue state + :type state: string + :param compute_env_order: Compute environment list + :type compute_env_order: list of dict + :return: Tuple of Name, ARN + :rtype: tuple of str + """ + if queue_name is None: + raise ClientException('jobQueueName must be provided') + + job_queue = self.get_job_queue(queue_name) + if job_queue is None: + raise ClientException('Job queue {0} does not exist'.format(queue_name)) + + if state is not None: + if state not in ('ENABLED', 'DISABLED'): + raise ClientException('state {0} must be one of ENABLED | DISABLED'.format(state)) + + job_queue.state = state + + if compute_env_order is not None: + if len(compute_env_order) == 0: + raise ClientException('At least 1 compute environment must be provided') + try: + # orders and extracts computeEnvironment names + ordered_compute_environments = [item['computeEnvironment'] for item in sorted(compute_env_order, key=lambda x: x['order'])] + env_objects = [] + # Check each ARN exists, then make a list of compute env's + for arn in ordered_compute_environments: + env = self.get_compute_environment_by_arn(arn) + if env is None: + raise ClientException('Compute environment {0} does not exist'.format(arn)) + env_objects.append(env) + except Exception: + raise ClientException('computeEnvironmentOrder is malformed') + + job_queue.env_order_json = compute_env_order + job_queue.environments = env_objects + + if priority is not None: + job_queue.priority = priority + + return queue_name, job_queue.arn + + def delete_job_queue(self, queue_name): + job_queue = self.get_job_queue(queue_name) + + if job_queue is not None: + del self._job_queues[job_queue.arn] + + def register_job_definition(self, def_name, parameters, _type, retry_strategy, container_properties): + if def_name is None: + raise ClientException('jobDefinitionName must be provided') + + job_def = self.get_job_definition_by_name(def_name) + if retry_strategy is not None: + try: + retry_strategy = retry_strategy['attempts'] + except Exception: + raise ClientException('retryStrategy is malformed') + + if job_def is None: + job_def = JobDefinition(def_name, parameters, _type, container_properties, region_name=self.region_name, retry_strategy=retry_strategy) + else: + # Make new jobdef + job_def = job_def.update(parameters, _type, container_properties, retry_strategy) + + self._job_definitions[job_def.arn] = job_def + + return def_name, job_def.arn, job_def.revision + + def deregister_job_definition(self, def_name): + job_def = self.get_job_definition_by_arn(def_name) + if job_def is None and ':' in def_name: + name, revision = def_name.split(':', 1) + job_def = self.get_job_definition_by_name_revision(name, revision) + + if job_def is not None: + del self._job_definitions[job_def.arn] + + def describe_job_definitions(self, job_def_name=None, job_def_list=None, status=None, max_results=None, next_token=None): + jobs = [] + + # As a job name can reference multiple revisions, we get a list of them + if job_def_name is not None: + job_def = self.get_job_definitions(job_def_name) + if job_def is not None: + jobs.extend(job_def) + elif job_def_list is not None: + for job in job_def_list: + job_def = self.get_job_definitions(job) + if job_def is not None: + jobs.extend(job_def) + else: + jobs.extend(self._job_definitions.values()) + + # Got all the job defs were after, filter then by status + if status is not None: + return [job for job in jobs if job.status == status] + return jobs + + def submit_job(self, job_name, job_def_id, job_queue, parameters=None, retries=None, depends_on=None, container_overrides=None): + # TODO parameters, retries (which is a dict raw from request), job dependancies and container overrides are ignored for now + + # Look for job definition + job_def = self.get_job_definition_by_arn(job_def_id) + if job_def is None and ':' in job_def_id: + job_def = self.get_job_definition_by_name_revision(*job_def_id.split(':', 1)) + if job_def is None: + raise ClientException('Job definition {0} does not exist'.format(job_def_id)) + + queue = self.get_job_queue(job_queue) + if queue is None: + raise ClientException('Job queue {0} does not exist'.format(job_queue)) + + job = Job(job_name, job_def, queue, log_backend=self.logs_backend) + self._jobs[job.job_id] = job + + # Here comes the fun + job.start() + + return job_name, job.job_id + + def describe_jobs(self, jobs): + job_filter = set() + if jobs is not None: + job_filter = set(jobs) + + result = [] + for key, job in self._jobs.items(): + if len(job_filter) > 0 and key not in job_filter: + continue + + result.append(job.describe()) + + return result + + def list_jobs(self, job_queue, job_status=None, max_results=None, next_token=None): + jobs = [] + + job_queue = self.get_job_queue(job_queue) + if job_queue is None: + raise ClientException('Job queue {0} does not exist'.format(job_queue)) + + if job_status is not None and job_status not in ('SUBMITTED', 'PENDING', 'RUNNABLE', 'STARTING', 'RUNNING', 'SUCCEEDED', 'FAILED'): + raise ClientException('Job status is not one of SUBMITTED | PENDING | RUNNABLE | STARTING | RUNNING | SUCCEEDED | FAILED') + + for job in job_queue.jobs: + if job_status is not None and job.job_state != job_status: + continue + + jobs.append(job) + + return jobs + + def terminate_job(self, job_id, reason): + if job_id is None: + raise ClientException('Job ID does not exist') + if reason is None: + raise ClientException('Reason does not exist') + + job = self.get_job_by_id(job_id) + if job is None: + raise ClientException('Job not found') + + job.terminate(reason) + + +available_regions = boto3.session.Session().get_available_regions("batch") +batch_backends = {region: BatchBackend(region_name=region) for region in available_regions} diff --git a/moto/batch/responses.py b/moto/batch/responses.py new file mode 100644 index 000000000..e626b7d4c --- /dev/null +++ b/moto/batch/responses.py @@ -0,0 +1,296 @@ +from __future__ import unicode_literals +from moto.core.responses import BaseResponse +from .models import batch_backends +from six.moves.urllib.parse import urlsplit + +from .exceptions import AWSError + +import json + + +class BatchResponse(BaseResponse): + def _error(self, code, message): + return json.dumps({'__type': code, 'message': message}), dict(status=400) + + @property + def batch_backend(self): + """ + :return: Batch Backend + :rtype: moto.batch.models.BatchBackend + """ + return batch_backends[self.region] + + @property + def json(self): + if self.body is None or self.body == '': + self._json = {} + elif not hasattr(self, '_json'): + try: + self._json = json.loads(self.body) + except json.JSONDecodeError: + print() + return self._json + + def _get_param(self, param_name, if_none=None): + val = self.json.get(param_name) + if val is not None: + return val + return if_none + + def _get_action(self): + # Return element after the /v1/* + return urlsplit(self.uri).path.lstrip('/').split('/')[1] + + # CreateComputeEnvironment + def createcomputeenvironment(self): + compute_env_name = self._get_param('computeEnvironmentName') + compute_resource = self._get_param('computeResources') + service_role = self._get_param('serviceRole') + state = self._get_param('state') + _type = self._get_param('type') + + try: + name, arn = self.batch_backend.create_compute_environment( + compute_environment_name=compute_env_name, + _type=_type, state=state, + compute_resources=compute_resource, + service_role=service_role + ) + except AWSError as err: + return err.response() + + result = { + 'computeEnvironmentArn': arn, + 'computeEnvironmentName': name + } + + return json.dumps(result) + + # DescribeComputeEnvironments + def describecomputeenvironments(self): + compute_environments = self._get_param('computeEnvironments') + max_results = self._get_param('maxResults') # Ignored, should be int + next_token = self._get_param('nextToken') # Ignored + + envs = self.batch_backend.describe_compute_environments(compute_environments, max_results=max_results, next_token=next_token) + + result = {'computeEnvironments': envs} + return json.dumps(result) + + # DeleteComputeEnvironment + def deletecomputeenvironment(self): + compute_environment = self._get_param('computeEnvironment') + + try: + self.batch_backend.delete_compute_environment(compute_environment) + except AWSError as err: + return err.response() + + return '' + + # UpdateComputeEnvironment + def updatecomputeenvironment(self): + compute_env_name = self._get_param('computeEnvironment') + compute_resource = self._get_param('computeResources') + service_role = self._get_param('serviceRole') + state = self._get_param('state') + + try: + name, arn = self.batch_backend.update_compute_environment( + compute_environment_name=compute_env_name, + compute_resources=compute_resource, + service_role=service_role, + state=state + ) + except AWSError as err: + return err.response() + + result = { + 'computeEnvironmentArn': arn, + 'computeEnvironmentName': name + } + + return json.dumps(result) + + # CreateJobQueue + def createjobqueue(self): + compute_env_order = self._get_param('computeEnvironmentOrder') + queue_name = self._get_param('jobQueueName') + priority = self._get_param('priority') + state = self._get_param('state') + + try: + name, arn = self.batch_backend.create_job_queue( + queue_name=queue_name, + priority=priority, + state=state, + compute_env_order=compute_env_order + ) + except AWSError as err: + return err.response() + + result = { + 'jobQueueArn': arn, + 'jobQueueName': name + } + + return json.dumps(result) + + # DescribeJobQueues + def describejobqueues(self): + job_queues = self._get_param('jobQueues') + max_results = self._get_param('maxResults') # Ignored, should be int + next_token = self._get_param('nextToken') # Ignored + + queues = self.batch_backend.describe_job_queues(job_queues, max_results=max_results, next_token=next_token) + + result = {'jobQueues': queues} + return json.dumps(result) + + # UpdateJobQueue + def updatejobqueue(self): + compute_env_order = self._get_param('computeEnvironmentOrder') + queue_name = self._get_param('jobQueue') + priority = self._get_param('priority') + state = self._get_param('state') + + try: + name, arn = self.batch_backend.update_job_queue( + queue_name=queue_name, + priority=priority, + state=state, + compute_env_order=compute_env_order + ) + except AWSError as err: + return err.response() + + result = { + 'jobQueueArn': arn, + 'jobQueueName': name + } + + return json.dumps(result) + + # DeleteJobQueue + def deletejobqueue(self): + queue_name = self._get_param('jobQueue') + + self.batch_backend.delete_job_queue(queue_name) + + return '' + + # RegisterJobDefinition + def registerjobdefinition(self): + container_properties = self._get_param('containerProperties') + def_name = self._get_param('jobDefinitionName') + parameters = self._get_param('parameters') + retry_strategy = self._get_param('retryStrategy') + _type = self._get_param('type') + + try: + name, arn, revision = self.batch_backend.register_job_definition( + def_name=def_name, + parameters=parameters, + _type=_type, + retry_strategy=retry_strategy, + container_properties=container_properties + ) + except AWSError as err: + return err.response() + + result = { + 'jobDefinitionArn': arn, + 'jobDefinitionName': name, + 'revision': revision + } + + return json.dumps(result) + + # DeregisterJobDefinition + def deregisterjobdefinition(self): + queue_name = self._get_param('jobDefinition') + + self.batch_backend.deregister_job_definition(queue_name) + + return '' + + # DescribeJobDefinitions + def describejobdefinitions(self): + job_def_name = self._get_param('jobDefinitionName') + job_def_list = self._get_param('jobDefinitions') + max_results = self._get_param('maxResults') + next_token = self._get_param('nextToken') + status = self._get_param('status') + + job_defs = self.batch_backend.describe_job_definitions(job_def_name, job_def_list, status, max_results, next_token) + + result = {'jobDefinitions': [job.describe() for job in job_defs]} + return json.dumps(result) + + # SubmitJob + def submitjob(self): + container_overrides = self._get_param('containerOverrides') + depends_on = self._get_param('dependsOn') + job_def = self._get_param('jobDefinition') + job_name = self._get_param('jobName') + job_queue = self._get_param('jobQueue') + parameters = self._get_param('parameters') + retries = self._get_param('retryStrategy') + + try: + name, job_id = self.batch_backend.submit_job( + job_name, job_def, job_queue, + parameters=parameters, + retries=retries, + depends_on=depends_on, + container_overrides=container_overrides + ) + except AWSError as err: + return err.response() + + result = { + 'jobId': job_id, + 'jobName': name, + } + + return json.dumps(result) + + # DescribeJobs + def describejobs(self): + jobs = self._get_param('jobs') + + try: + return json.dumps({'jobs': self.batch_backend.describe_jobs(jobs)}) + except AWSError as err: + return err.response() + + # ListJobs + def listjobs(self): + job_queue = self._get_param('jobQueue') + job_status = self._get_param('jobStatus') + max_results = self._get_param('maxResults') + next_token = self._get_param('nextToken') + + try: + jobs = self.batch_backend.list_jobs(job_queue, job_status, max_results, next_token) + except AWSError as err: + return err.response() + + result = {'jobSummaryList': [{'jobId': job.job_id, 'jobName': job.job_name} for job in jobs]} + return json.dumps(result) + + # TerminateJob + def terminatejob(self): + job_id = self._get_param('jobId') + reason = self._get_param('reason') + + try: + self.batch_backend.terminate_job(job_id, reason) + except AWSError as err: + return err.response() + + return '' + + # CancelJob + def canceljob(self): # Theres some AWS semantics on the differences but for us they're identical ;-) + return self.terminatejob() diff --git a/moto/batch/urls.py b/moto/batch/urls.py new file mode 100644 index 000000000..c64086ef2 --- /dev/null +++ b/moto/batch/urls.py @@ -0,0 +1,25 @@ +from __future__ import unicode_literals +from .responses import BatchResponse + +url_bases = [ + "https?://batch.(.+).amazonaws.com", +] + +url_paths = { + '{0}/v1/createcomputeenvironment$': BatchResponse.dispatch, + '{0}/v1/describecomputeenvironments$': BatchResponse.dispatch, + '{0}/v1/deletecomputeenvironment': BatchResponse.dispatch, + '{0}/v1/updatecomputeenvironment': BatchResponse.dispatch, + '{0}/v1/createjobqueue': BatchResponse.dispatch, + '{0}/v1/describejobqueues': BatchResponse.dispatch, + '{0}/v1/updatejobqueue': BatchResponse.dispatch, + '{0}/v1/deletejobqueue': BatchResponse.dispatch, + '{0}/v1/registerjobdefinition': BatchResponse.dispatch, + '{0}/v1/deregisterjobdefinition': BatchResponse.dispatch, + '{0}/v1/describejobdefinitions': BatchResponse.dispatch, + '{0}/v1/submitjob': BatchResponse.dispatch, + '{0}/v1/describejobs': BatchResponse.dispatch, + '{0}/v1/listjobs': BatchResponse.dispatch, + '{0}/v1/terminatejob': BatchResponse.dispatch, + '{0}/v1/canceljob': BatchResponse.dispatch, +} diff --git a/moto/batch/utils.py b/moto/batch/utils.py new file mode 100644 index 000000000..829a55f12 --- /dev/null +++ b/moto/batch/utils.py @@ -0,0 +1,22 @@ +from __future__ import unicode_literals + + +def make_arn_for_compute_env(account_id, name, region_name): + return "arn:aws:batch:{0}:{1}:compute-environment/{2}".format(region_name, account_id, name) + + +def make_arn_for_job_queue(account_id, name, region_name): + return "arn:aws:batch:{0}:{1}:job-queue/{2}".format(region_name, account_id, name) + + +def make_arn_for_task_def(account_id, name, revision, region_name): + return "arn:aws:batch:{0}:{1}:job-definition/{2}:{3}".format(region_name, account_id, name, revision) + + +def lowercase_first_key(some_dict): + new_dict = {} + for key, value in some_dict.items(): + new_key = key[0].lower() + key[1:] + new_dict[new_key] = value + + return new_dict diff --git a/moto/cloudformation/parsing.py b/moto/cloudformation/parsing.py index 923ada058..05a408be1 100644 --- a/moto/cloudformation/parsing.py +++ b/moto/cloudformation/parsing.py @@ -8,6 +8,7 @@ import re from moto.autoscaling import models as autoscaling_models from moto.awslambda import models as lambda_models +from moto.batch import models as batch_models from moto.cloudwatch import models as cloudwatch_models from moto.datapipeline import models as datapipeline_models from moto.dynamodb import models as dynamodb_models @@ -31,6 +32,9 @@ from boto.cloudformation.stack import Output MODEL_MAP = { "AWS::AutoScaling::AutoScalingGroup": autoscaling_models.FakeAutoScalingGroup, "AWS::AutoScaling::LaunchConfiguration": autoscaling_models.FakeLaunchConfiguration, + "AWS::Batch::JobDefinition": batch_models.JobDefinition, + "AWS::Batch::JobQueue": batch_models.JobQueue, + "AWS::Batch::ComputeEnvironment": batch_models.ComputeEnvironment, "AWS::DynamoDB::Table": dynamodb_models.Table, "AWS::Kinesis::Stream": kinesis_models.Stream, "AWS::Lambda::EventSourceMapping": lambda_models.EventSourceMapping, diff --git a/moto/elbv2/urls.py b/moto/elbv2/urls.py index 13a8e056f..af51f7d3a 100644 --- a/moto/elbv2/urls.py +++ b/moto/elbv2/urls.py @@ -1,10 +1,10 @@ from __future__ import unicode_literals -from .responses import ELBV2Response +from ..elb.urls import api_version_elb_backend url_bases = [ "https?://elasticloadbalancing.(.+).amazonaws.com", ] url_paths = { - '{0}/$': ELBV2Response.dispatch, + '{0}/$': api_version_elb_backend, } diff --git a/moto/iam/models.py b/moto/iam/models.py index 18ed513b4..22bdfdb4b 100644 --- a/moto/iam/models.py +++ b/moto/iam/models.py @@ -528,6 +528,12 @@ class IAMBackend(BaseBackend): return role raise IAMNotFoundException("Role {0} not found".format(role_name)) + def get_role_by_arn(self, arn): + for role in self.get_roles(): + if role.arn == arn: + return role + raise IAMNotFoundException("Role {0} not found".format(arn)) + def delete_role(self, role_name): for role in self.get_roles(): if role.name == role_name: diff --git a/moto/logs/models.py b/moto/logs/models.py index 14f511932..09dcb3645 100644 --- a/moto/logs/models.py +++ b/moto/logs/models.py @@ -22,6 +22,13 @@ class LogEvent: "timestamp": self.timestamp } + def to_response_dict(self): + return { + "ingestionTime": self.ingestionTime, + "message": self.message, + "timestamp": self.timestamp + } + class LogStream: _log_ids = 0 @@ -41,7 +48,14 @@ class LogStream: self.__class__._log_ids += 1 + def _update(self): + self.firstEventTimestamp = min([x.timestamp for x in self.events]) + self.lastEventTimestamp = max([x.timestamp for x in self.events]) + def to_describe_dict(self): + # Compute start and end times + self._update() + return { "arn": self.arn, "creationTime": self.creationTime, @@ -79,7 +93,7 @@ class LogStream: if next_token is None: next_token = 0 - events_page = events[next_token: next_token + limit] + events_page = [event.to_response_dict() for event in events[next_token: next_token + limit]] next_token += limit if next_token >= len(self.events): next_token = None @@ -120,17 +134,17 @@ class LogGroup: del self.streams[log_stream_name] def describe_log_streams(self, descending, limit, log_group_name, log_stream_name_prefix, next_token, order_by): - log_streams = [stream.to_describe_dict() for name, stream in self.streams.items() if name.startswith(log_stream_name_prefix)] + log_streams = [(name, stream.to_describe_dict()) for name, stream in self.streams.items() if name.startswith(log_stream_name_prefix)] - def sorter(stream): - return stream.name if order_by == 'logStreamName' else stream.lastEventTimestamp + def sorter(item): + return item[0] if order_by == 'logStreamName' else item[1]['lastEventTimestamp'] if next_token is None: next_token = 0 log_streams = sorted(log_streams, key=sorter, reverse=descending) new_token = next_token + limit - log_streams_page = log_streams[next_token: new_token] + log_streams_page = [x[1] for x in log_streams[next_token: new_token]] if new_token >= len(log_streams): new_token = None diff --git a/moto/logs/responses.py b/moto/logs/responses.py index 4cb9caa6a..53b2390f4 100644 --- a/moto/logs/responses.py +++ b/moto/logs/responses.py @@ -47,7 +47,7 @@ class LogsResponse(BaseResponse): def describe_log_streams(self): log_group_name = self._get_param('logGroupName') - log_stream_name_prefix = self._get_param('logStreamNamePrefix') + log_stream_name_prefix = self._get_param('logStreamNamePrefix', '') descending = self._get_param('descending', False) limit = self._get_param('limit', 50) assert limit <= 50 @@ -83,7 +83,7 @@ class LogsResponse(BaseResponse): limit = self._get_param('limit', 10000) assert limit <= 10000 next_token = self._get_param('nextToken') - start_from_head = self._get_param('startFromHead') + start_from_head = self._get_param('startFromHead', False) events, next_backward_token, next_foward_token = \ self.logs_backend.get_log_events(log_group_name, log_stream_name, start_time, end_time, limit, next_token, start_from_head) diff --git a/tests/test_batch/test_batch.py b/tests/test_batch/test_batch.py new file mode 100644 index 000000000..ec24cd911 --- /dev/null +++ b/tests/test_batch/test_batch.py @@ -0,0 +1,809 @@ +from __future__ import unicode_literals + +import time +import datetime +import boto3 +from botocore.exceptions import ClientError +import sure # noqa +from moto import mock_batch, mock_iam, mock_ec2, mock_ecs, mock_logs +import functools +import nose + + +def expected_failure(test): + @functools.wraps(test) + def inner(*args, **kwargs): + try: + test(*args, **kwargs) + except Exception as err: + raise nose.SkipTest + return inner + +DEFAULT_REGION = 'eu-central-1' + + +def _get_clients(): + return boto3.client('ec2', region_name=DEFAULT_REGION), \ + boto3.client('iam', region_name=DEFAULT_REGION), \ + boto3.client('ecs', region_name=DEFAULT_REGION), \ + boto3.client('logs', region_name=DEFAULT_REGION), \ + boto3.client('batch', region_name=DEFAULT_REGION) + + +def _setup(ec2_client, iam_client): + """ + Do prerequisite setup + :return: VPC ID, Subnet ID, Security group ID, IAM Role ARN + :rtype: tuple + """ + resp = ec2_client.create_vpc(CidrBlock='172.30.0.0/24') + vpc_id = resp['Vpc']['VpcId'] + resp = ec2_client.create_subnet( + AvailabilityZone='eu-central-1a', + CidrBlock='172.30.0.0/25', + VpcId=vpc_id + ) + subnet_id = resp['Subnet']['SubnetId'] + resp = ec2_client.create_security_group( + Description='test_sg_desc', + GroupName='test_sg', + VpcId=vpc_id + ) + sg_id = resp['GroupId'] + + resp = iam_client.create_role( + RoleName='TestRole', + AssumeRolePolicyDocument='some_policy' + ) + iam_arn = resp['Role']['Arn'] + + return vpc_id, subnet_id, sg_id, iam_arn + + +# Yes, yes it talks to all the things +@mock_ec2 +@mock_ecs +@mock_iam +@mock_batch +def test_create_managed_compute_environment(): + ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients() + vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) + + compute_name = 'test_compute_env' + resp = batch_client.create_compute_environment( + computeEnvironmentName=compute_name, + type='MANAGED', + state='ENABLED', + computeResources={ + 'type': 'EC2', + 'minvCpus': 5, + 'maxvCpus': 10, + 'desiredvCpus': 5, + 'instanceTypes': [ + 't2.small', + 't2.medium' + ], + 'imageId': 'some_image_id', + 'subnets': [ + subnet_id, + ], + 'securityGroupIds': [ + sg_id, + ], + 'ec2KeyPair': 'string', + 'instanceRole': iam_arn, + 'tags': { + 'string': 'string' + }, + 'bidPercentage': 123, + 'spotIamFleetRole': 'string' + }, + serviceRole=iam_arn + ) + resp.should.contain('computeEnvironmentArn') + resp['computeEnvironmentName'].should.equal(compute_name) + + # Given a t2.medium is 2 vcpu and t2.small is 1, therefore 2 mediums and 1 small should be created + resp = ec2_client.describe_instances() + resp.should.contain('Reservations') + len(resp['Reservations']).should.equal(3) + + # Should have created 1 ECS cluster + resp = ecs_client.list_clusters() + resp.should.contain('clusterArns') + len(resp['clusterArns']).should.equal(1) + + +@mock_ec2 +@mock_ecs +@mock_iam +@mock_batch +def test_create_unmanaged_compute_environment(): + ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients() + vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) + + compute_name = 'test_compute_env' + resp = batch_client.create_compute_environment( + computeEnvironmentName=compute_name, + type='UNMANAGED', + state='ENABLED', + serviceRole=iam_arn + ) + resp.should.contain('computeEnvironmentArn') + resp['computeEnvironmentName'].should.equal(compute_name) + + # Its unmanaged so no instances should be created + resp = ec2_client.describe_instances() + resp.should.contain('Reservations') + len(resp['Reservations']).should.equal(0) + + # Should have created 1 ECS cluster + resp = ecs_client.list_clusters() + resp.should.contain('clusterArns') + len(resp['clusterArns']).should.equal(1) + +# TODO create 1000s of tests to test complex option combinations of create environment + + +@mock_ec2 +@mock_ecs +@mock_iam +@mock_batch +def test_describe_compute_environment(): + ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients() + vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) + + compute_name = 'test_compute_env' + batch_client.create_compute_environment( + computeEnvironmentName=compute_name, + type='UNMANAGED', + state='ENABLED', + serviceRole=iam_arn + ) + + resp = batch_client.describe_compute_environments() + len(resp['computeEnvironments']).should.equal(1) + resp['computeEnvironments'][0]['computeEnvironmentName'].should.equal(compute_name) + + # Test filtering + resp = batch_client.describe_compute_environments( + computeEnvironments=['test1'] + ) + len(resp['computeEnvironments']).should.equal(0) + + +@mock_ec2 +@mock_ecs +@mock_iam +@mock_batch +def test_delete_unmanaged_compute_environment(): + ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients() + vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) + + compute_name = 'test_compute_env' + batch_client.create_compute_environment( + computeEnvironmentName=compute_name, + type='UNMANAGED', + state='ENABLED', + serviceRole=iam_arn + ) + + batch_client.delete_compute_environment( + computeEnvironment=compute_name, + ) + + resp = batch_client.describe_compute_environments() + len(resp['computeEnvironments']).should.equal(0) + + resp = ecs_client.list_clusters() + len(resp.get('clusterArns', [])).should.equal(0) + + +@mock_ec2 +@mock_ecs +@mock_iam +@mock_batch +def test_delete_managed_compute_environment(): + ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients() + vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) + + compute_name = 'test_compute_env' + batch_client.create_compute_environment( + computeEnvironmentName=compute_name, + type='MANAGED', + state='ENABLED', + computeResources={ + 'type': 'EC2', + 'minvCpus': 5, + 'maxvCpus': 10, + 'desiredvCpus': 5, + 'instanceTypes': [ + 't2.small', + 't2.medium' + ], + 'imageId': 'some_image_id', + 'subnets': [ + subnet_id, + ], + 'securityGroupIds': [ + sg_id, + ], + 'ec2KeyPair': 'string', + 'instanceRole': iam_arn, + 'tags': { + 'string': 'string' + }, + 'bidPercentage': 123, + 'spotIamFleetRole': 'string' + }, + serviceRole=iam_arn + ) + + batch_client.delete_compute_environment( + computeEnvironment=compute_name, + ) + + resp = batch_client.describe_compute_environments() + len(resp['computeEnvironments']).should.equal(0) + + resp = ec2_client.describe_instances() + resp.should.contain('Reservations') + len(resp['Reservations']).should.equal(3) + for reservation in resp['Reservations']: + reservation['Instances'][0]['State']['Name'].should.equal('terminated') + + resp = ecs_client.list_clusters() + len(resp.get('clusterArns', [])).should.equal(0) + + +@mock_ec2 +@mock_ecs +@mock_iam +@mock_batch +def test_update_unmanaged_compute_environment_state(): + ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients() + vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) + + compute_name = 'test_compute_env' + batch_client.create_compute_environment( + computeEnvironmentName=compute_name, + type='UNMANAGED', + state='ENABLED', + serviceRole=iam_arn + ) + + batch_client.update_compute_environment( + computeEnvironment=compute_name, + state='DISABLED' + ) + + resp = batch_client.describe_compute_environments() + len(resp['computeEnvironments']).should.equal(1) + resp['computeEnvironments'][0]['state'].should.equal('DISABLED') + + +@mock_ec2 +@mock_ecs +@mock_iam +@mock_batch +def test_create_job_queue(): + ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients() + vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) + + compute_name = 'test_compute_env' + resp = batch_client.create_compute_environment( + computeEnvironmentName=compute_name, + type='UNMANAGED', + state='ENABLED', + serviceRole=iam_arn + ) + arn = resp['computeEnvironmentArn'] + + resp = batch_client.create_job_queue( + jobQueueName='test_job_queue', + state='ENABLED', + priority=123, + computeEnvironmentOrder=[ + { + 'order': 123, + 'computeEnvironment': arn + }, + ] + ) + resp.should.contain('jobQueueArn') + resp.should.contain('jobQueueName') + queue_arn = resp['jobQueueArn'] + + resp = batch_client.describe_job_queues() + resp.should.contain('jobQueues') + len(resp['jobQueues']).should.equal(1) + resp['jobQueues'][0]['jobQueueArn'].should.equal(queue_arn) + + resp = batch_client.describe_job_queues(jobQueues=['test_invalid_queue']) + resp.should.contain('jobQueues') + len(resp['jobQueues']).should.equal(0) + + +@mock_ec2 +@mock_ecs +@mock_iam +@mock_batch +def test_job_queue_bad_arn(): + ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients() + vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) + + compute_name = 'test_compute_env' + resp = batch_client.create_compute_environment( + computeEnvironmentName=compute_name, + type='UNMANAGED', + state='ENABLED', + serviceRole=iam_arn + ) + arn = resp['computeEnvironmentArn'] + + try: + batch_client.create_job_queue( + jobQueueName='test_job_queue', + state='ENABLED', + priority=123, + computeEnvironmentOrder=[ + { + 'order': 123, + 'computeEnvironment': arn + 'LALALA' + }, + ] + ) + except ClientError as err: + err.response['Error']['Code'].should.equal('ClientException') + + +@mock_ec2 +@mock_ecs +@mock_iam +@mock_batch +def test_update_job_queue(): + ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients() + vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) + + compute_name = 'test_compute_env' + resp = batch_client.create_compute_environment( + computeEnvironmentName=compute_name, + type='UNMANAGED', + state='ENABLED', + serviceRole=iam_arn + ) + arn = resp['computeEnvironmentArn'] + + resp = batch_client.create_job_queue( + jobQueueName='test_job_queue', + state='ENABLED', + priority=123, + computeEnvironmentOrder=[ + { + 'order': 123, + 'computeEnvironment': arn + }, + ] + ) + queue_arn = resp['jobQueueArn'] + + batch_client.update_job_queue( + jobQueue=queue_arn, + priority=5 + ) + + resp = batch_client.describe_job_queues() + resp.should.contain('jobQueues') + len(resp['jobQueues']).should.equal(1) + resp['jobQueues'][0]['priority'].should.equal(5) + + +@mock_ec2 +@mock_ecs +@mock_iam +@mock_batch +def test_update_job_queue(): + ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients() + vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) + + compute_name = 'test_compute_env' + resp = batch_client.create_compute_environment( + computeEnvironmentName=compute_name, + type='UNMANAGED', + state='ENABLED', + serviceRole=iam_arn + ) + arn = resp['computeEnvironmentArn'] + + resp = batch_client.create_job_queue( + jobQueueName='test_job_queue', + state='ENABLED', + priority=123, + computeEnvironmentOrder=[ + { + 'order': 123, + 'computeEnvironment': arn + }, + ] + ) + queue_arn = resp['jobQueueArn'] + + batch_client.delete_job_queue( + jobQueue=queue_arn + ) + + resp = batch_client.describe_job_queues() + resp.should.contain('jobQueues') + len(resp['jobQueues']).should.equal(0) + + +@mock_ec2 +@mock_ecs +@mock_iam +@mock_batch +def test_register_task_definition(): + ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients() + vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) + + resp = batch_client.register_job_definition( + jobDefinitionName='sleep10', + type='container', + containerProperties={ + 'image': 'busybox', + 'vcpus': 1, + 'memory': 128, + 'command': ['sleep', '10'] + } + ) + + resp.should.contain('jobDefinitionArn') + resp.should.contain('jobDefinitionName') + resp.should.contain('revision') + + assert resp['jobDefinitionArn'].endswith('{0}:{1}'.format(resp['jobDefinitionName'], resp['revision'])) + + +@mock_ec2 +@mock_ecs +@mock_iam +@mock_batch +def test_reregister_task_definition(): + # Reregistering task with the same name bumps the revision number + ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients() + vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) + + resp1 = batch_client.register_job_definition( + jobDefinitionName='sleep10', + type='container', + containerProperties={ + 'image': 'busybox', + 'vcpus': 1, + 'memory': 128, + 'command': ['sleep', '10'] + } + ) + + resp1.should.contain('jobDefinitionArn') + resp1.should.contain('jobDefinitionName') + resp1.should.contain('revision') + + assert resp1['jobDefinitionArn'].endswith('{0}:{1}'.format(resp1['jobDefinitionName'], resp1['revision'])) + resp1['revision'].should.equal(1) + + resp2 = batch_client.register_job_definition( + jobDefinitionName='sleep10', + type='container', + containerProperties={ + 'image': 'busybox', + 'vcpus': 1, + 'memory': 68, + 'command': ['sleep', '10'] + } + ) + resp2['revision'].should.equal(2) + + resp2['jobDefinitionArn'].should_not.equal(resp1['jobDefinitionArn']) + + +@mock_ec2 +@mock_ecs +@mock_iam +@mock_batch +def test_delete_task_definition(): + ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients() + vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) + + resp = batch_client.register_job_definition( + jobDefinitionName='sleep10', + type='container', + containerProperties={ + 'image': 'busybox', + 'vcpus': 1, + 'memory': 128, + 'command': ['sleep', '10'] + } + ) + + batch_client.deregister_job_definition(jobDefinition=resp['jobDefinitionArn']) + + resp = batch_client.describe_job_definitions() + len(resp['jobDefinitions']).should.equal(0) + + +@mock_ec2 +@mock_ecs +@mock_iam +@mock_batch +def test_describe_task_definition(): + ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients() + vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) + + batch_client.register_job_definition( + jobDefinitionName='sleep10', + type='container', + containerProperties={ + 'image': 'busybox', + 'vcpus': 1, + 'memory': 128, + 'command': ['sleep', '10'] + } + ) + batch_client.register_job_definition( + jobDefinitionName='sleep10', + type='container', + containerProperties={ + 'image': 'busybox', + 'vcpus': 1, + 'memory': 64, + 'command': ['sleep', '10'] + } + ) + batch_client.register_job_definition( + jobDefinitionName='test1', + type='container', + containerProperties={ + 'image': 'busybox', + 'vcpus': 1, + 'memory': 64, + 'command': ['sleep', '10'] + } + ) + + resp = batch_client.describe_job_definitions( + jobDefinitionName='sleep10' + ) + len(resp['jobDefinitions']).should.equal(2) + + resp = batch_client.describe_job_definitions() + len(resp['jobDefinitions']).should.equal(3) + + resp = batch_client.describe_job_definitions( + jobDefinitions=['sleep10', 'test1'] + ) + len(resp['jobDefinitions']).should.equal(3) + + +# SLOW TESTS +@expected_failure +@mock_logs +@mock_ec2 +@mock_ecs +@mock_iam +@mock_batch +def test_submit_job(): + ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients() + vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) + + compute_name = 'test_compute_env' + resp = batch_client.create_compute_environment( + computeEnvironmentName=compute_name, + type='UNMANAGED', + state='ENABLED', + serviceRole=iam_arn + ) + arn = resp['computeEnvironmentArn'] + + resp = batch_client.create_job_queue( + jobQueueName='test_job_queue', + state='ENABLED', + priority=123, + computeEnvironmentOrder=[ + { + 'order': 123, + 'computeEnvironment': arn + }, + ] + ) + queue_arn = resp['jobQueueArn'] + + resp = batch_client.register_job_definition( + jobDefinitionName='sleep10', + type='container', + containerProperties={ + 'image': 'busybox', + 'vcpus': 1, + 'memory': 128, + 'command': ['sleep', '10'] + } + ) + job_def_arn = resp['jobDefinitionArn'] + + resp = batch_client.submit_job( + jobName='test1', + jobQueue=queue_arn, + jobDefinition=job_def_arn + ) + job_id = resp['jobId'] + + future = datetime.datetime.now() + datetime.timedelta(seconds=30) + + while datetime.datetime.now() < future: + resp = batch_client.describe_jobs(jobs=[job_id]) + print("{0}:{1} {2}".format(resp['jobs'][0]['jobName'], resp['jobs'][0]['jobId'], resp['jobs'][0]['status'])) + + if resp['jobs'][0]['status'] == 'FAILED': + raise RuntimeError('Batch job failed') + if resp['jobs'][0]['status'] == 'SUCCEEDED': + break + time.sleep(0.5) + else: + raise RuntimeError('Batch job timed out') + + resp = logs_client.describe_log_streams(logGroupName='/aws/batch/job') + len(resp['logStreams']).should.equal(1) + ls_name = resp['logStreams'][0]['logStreamName'] + + resp = logs_client.get_log_events(logGroupName='/aws/batch/job', logStreamName=ls_name) + len(resp['events']).should.be.greater_than(5) + + +@expected_failure +@mock_logs +@mock_ec2 +@mock_ecs +@mock_iam +@mock_batch +def test_list_jobs(): + ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients() + vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) + + compute_name = 'test_compute_env' + resp = batch_client.create_compute_environment( + computeEnvironmentName=compute_name, + type='UNMANAGED', + state='ENABLED', + serviceRole=iam_arn + ) + arn = resp['computeEnvironmentArn'] + + resp = batch_client.create_job_queue( + jobQueueName='test_job_queue', + state='ENABLED', + priority=123, + computeEnvironmentOrder=[ + { + 'order': 123, + 'computeEnvironment': arn + }, + ] + ) + queue_arn = resp['jobQueueArn'] + + resp = batch_client.register_job_definition( + jobDefinitionName='sleep10', + type='container', + containerProperties={ + 'image': 'busybox', + 'vcpus': 1, + 'memory': 128, + 'command': ['sleep', '10'] + } + ) + job_def_arn = resp['jobDefinitionArn'] + + resp = batch_client.submit_job( + jobName='test1', + jobQueue=queue_arn, + jobDefinition=job_def_arn + ) + job_id1 = resp['jobId'] + resp = batch_client.submit_job( + jobName='test2', + jobQueue=queue_arn, + jobDefinition=job_def_arn + ) + job_id2 = resp['jobId'] + + future = datetime.datetime.now() + datetime.timedelta(seconds=30) + + resp_finished_jobs = batch_client.list_jobs( + jobQueue=queue_arn, + jobStatus='SUCCEEDED' + ) + + # Wait only as long as it takes to run the jobs + while datetime.datetime.now() < future: + resp = batch_client.describe_jobs(jobs=[job_id1, job_id2]) + + any_failed_jobs = any([job['status'] == 'FAILED' for job in resp['jobs']]) + succeeded_jobs = all([job['status'] == 'SUCCEEDED' for job in resp['jobs']]) + + if any_failed_jobs: + raise RuntimeError('A Batch job failed') + if succeeded_jobs: + break + time.sleep(0.5) + else: + raise RuntimeError('Batch jobs timed out') + + resp_finished_jobs2 = batch_client.list_jobs( + jobQueue=queue_arn, + jobStatus='SUCCEEDED' + ) + + len(resp_finished_jobs['jobSummaryList']).should.equal(0) + len(resp_finished_jobs2['jobSummaryList']).should.equal(2) + + +@expected_failure +@mock_logs +@mock_ec2 +@mock_ecs +@mock_iam +@mock_batch +def test_terminate_job(): + ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients() + vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) + + compute_name = 'test_compute_env' + resp = batch_client.create_compute_environment( + computeEnvironmentName=compute_name, + type='UNMANAGED', + state='ENABLED', + serviceRole=iam_arn + ) + arn = resp['computeEnvironmentArn'] + + resp = batch_client.create_job_queue( + jobQueueName='test_job_queue', + state='ENABLED', + priority=123, + computeEnvironmentOrder=[ + { + 'order': 123, + 'computeEnvironment': arn + }, + ] + ) + queue_arn = resp['jobQueueArn'] + + resp = batch_client.register_job_definition( + jobDefinitionName='sleep10', + type='container', + containerProperties={ + 'image': 'busybox', + 'vcpus': 1, + 'memory': 128, + 'command': ['sleep', '10'] + } + ) + job_def_arn = resp['jobDefinitionArn'] + + resp = batch_client.submit_job( + jobName='test1', + jobQueue=queue_arn, + jobDefinition=job_def_arn + ) + job_id = resp['jobId'] + + time.sleep(2) + + batch_client.terminate_job(jobId=job_id, reason='test_terminate') + + time.sleep(1) + + resp = batch_client.describe_jobs(jobs=[job_id]) + resp['jobs'][0]['jobName'].should.equal('test1') + resp['jobs'][0]['status'].should.equal('FAILED') + resp['jobs'][0]['statusReason'].should.equal('test_terminate') + diff --git a/tests/test_batch/test_cloudformation.py b/tests/test_batch/test_cloudformation.py new file mode 100644 index 000000000..1e37aa3a6 --- /dev/null +++ b/tests/test_batch/test_cloudformation.py @@ -0,0 +1,247 @@ +from __future__ import unicode_literals + +import time +import datetime +import boto3 +from botocore.exceptions import ClientError +import sure # noqa +from moto import mock_batch, mock_iam, mock_ec2, mock_ecs, mock_logs, mock_cloudformation +import functools +import nose +import json + +DEFAULT_REGION = 'eu-central-1' + + +def _get_clients(): + return boto3.client('ec2', region_name=DEFAULT_REGION), \ + boto3.client('iam', region_name=DEFAULT_REGION), \ + boto3.client('ecs', region_name=DEFAULT_REGION), \ + boto3.client('logs', region_name=DEFAULT_REGION), \ + boto3.client('batch', region_name=DEFAULT_REGION) + + +def _setup(ec2_client, iam_client): + """ + Do prerequisite setup + :return: VPC ID, Subnet ID, Security group ID, IAM Role ARN + :rtype: tuple + """ + resp = ec2_client.create_vpc(CidrBlock='172.30.0.0/24') + vpc_id = resp['Vpc']['VpcId'] + resp = ec2_client.create_subnet( + AvailabilityZone='eu-central-1a', + CidrBlock='172.30.0.0/25', + VpcId=vpc_id + ) + subnet_id = resp['Subnet']['SubnetId'] + resp = ec2_client.create_security_group( + Description='test_sg_desc', + GroupName='test_sg', + VpcId=vpc_id + ) + sg_id = resp['GroupId'] + + resp = iam_client.create_role( + RoleName='TestRole', + AssumeRolePolicyDocument='some_policy' + ) + iam_arn = resp['Role']['Arn'] + + return vpc_id, subnet_id, sg_id, iam_arn + + +@mock_cloudformation() +@mock_ec2 +@mock_ecs +@mock_iam +@mock_batch +def test_create_env_cf(): + ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients() + vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) + + create_environment_template = { + 'Resources': { + "ComputeEnvironment": { + "Type": "AWS::Batch::ComputeEnvironment", + "Properties": { + "Type": "MANAGED", + "ComputeResources": { + "Type": "EC2", + "MinvCpus": 0, + "DesiredvCpus": 0, + "MaxvCpus": 64, + "InstanceTypes": [ + "optimal" + ], + "Subnets": [subnet_id], + "SecurityGroupIds": [sg_id], + "InstanceRole": iam_arn + }, + "ServiceRole": iam_arn + } + } + } + } + cf_json = json.dumps(create_environment_template) + + cf_conn = boto3.client('cloudformation', DEFAULT_REGION) + stack_id = cf_conn.create_stack( + StackName='test_stack', + TemplateBody=cf_json, + )['StackId'] + + stack_resources = cf_conn.list_stack_resources(StackName=stack_id) + + stack_resources['StackResourceSummaries'][0]['ResourceStatus'].should.equal('CREATE_COMPLETE') + # Spot checks on the ARN + stack_resources['StackResourceSummaries'][0]['PhysicalResourceId'].startswith('arn:aws:batch:') + stack_resources['StackResourceSummaries'][0]['PhysicalResourceId'].should.contain('test_stack') + + +@mock_cloudformation() +@mock_ec2 +@mock_ecs +@mock_iam +@mock_batch +def test_create_job_queue_cf(): + ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients() + vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) + + create_environment_template = { + 'Resources': { + "ComputeEnvironment": { + "Type": "AWS::Batch::ComputeEnvironment", + "Properties": { + "Type": "MANAGED", + "ComputeResources": { + "Type": "EC2", + "MinvCpus": 0, + "DesiredvCpus": 0, + "MaxvCpus": 64, + "InstanceTypes": [ + "optimal" + ], + "Subnets": [subnet_id], + "SecurityGroupIds": [sg_id], + "InstanceRole": iam_arn + }, + "ServiceRole": iam_arn + } + }, + + "JobQueue": { + "Type": "AWS::Batch::JobQueue", + "Properties": { + "Priority": 1, + "ComputeEnvironmentOrder": [ + { + "Order": 1, + "ComputeEnvironment": {"Ref": "ComputeEnvironment"} + } + ] + } + }, + } + } + cf_json = json.dumps(create_environment_template) + + cf_conn = boto3.client('cloudformation', DEFAULT_REGION) + stack_id = cf_conn.create_stack( + StackName='test_stack', + TemplateBody=cf_json, + )['StackId'] + + stack_resources = cf_conn.list_stack_resources(StackName=stack_id) + len(stack_resources['StackResourceSummaries']).should.equal(2) + + job_queue_resource = list(filter(lambda item: item['ResourceType'] == 'AWS::Batch::JobQueue', stack_resources['StackResourceSummaries']))[0] + + job_queue_resource['ResourceStatus'].should.equal('CREATE_COMPLETE') + # Spot checks on the ARN + job_queue_resource['PhysicalResourceId'].startswith('arn:aws:batch:') + job_queue_resource['PhysicalResourceId'].should.contain('test_stack') + job_queue_resource['PhysicalResourceId'].should.contain('job-queue/') + + +@mock_cloudformation() +@mock_ec2 +@mock_ecs +@mock_iam +@mock_batch +def test_create_job_def_cf(): + ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients() + vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client) + + create_environment_template = { + 'Resources': { + "ComputeEnvironment": { + "Type": "AWS::Batch::ComputeEnvironment", + "Properties": { + "Type": "MANAGED", + "ComputeResources": { + "Type": "EC2", + "MinvCpus": 0, + "DesiredvCpus": 0, + "MaxvCpus": 64, + "InstanceTypes": [ + "optimal" + ], + "Subnets": [subnet_id], + "SecurityGroupIds": [sg_id], + "InstanceRole": iam_arn + }, + "ServiceRole": iam_arn + } + }, + + "JobQueue": { + "Type": "AWS::Batch::JobQueue", + "Properties": { + "Priority": 1, + "ComputeEnvironmentOrder": [ + { + "Order": 1, + "ComputeEnvironment": {"Ref": "ComputeEnvironment"} + } + ] + } + }, + + "JobDefinition": { + "Type": "AWS::Batch::JobDefinition", + "Properties": { + "Type": "container", + "ContainerProperties": { + "Image": { + "Fn::Join": ["", ["137112412989.dkr.ecr.", {"Ref": "AWS::Region"}, ".amazonaws.com/amazonlinux:latest"]] + }, + "Vcpus": 2, + "Memory": 2000, + "Command": ["echo", "Hello world"] + }, + "RetryStrategy": { + "Attempts": 1 + } + } + }, + } + } + cf_json = json.dumps(create_environment_template) + + cf_conn = boto3.client('cloudformation', DEFAULT_REGION) + stack_id = cf_conn.create_stack( + StackName='test_stack', + TemplateBody=cf_json, + )['StackId'] + + stack_resources = cf_conn.list_stack_resources(StackName=stack_id) + len(stack_resources['StackResourceSummaries']).should.equal(3) + + job_def_resource = list(filter(lambda item: item['ResourceType'] == 'AWS::Batch::JobDefinition', stack_resources['StackResourceSummaries']))[0] + + job_def_resource['ResourceStatus'].should.equal('CREATE_COMPLETE') + # Spot checks on the ARN + job_def_resource['PhysicalResourceId'].startswith('arn:aws:batch:') + job_def_resource['PhysicalResourceId'].should.contain('test_stack-JobDef') + job_def_resource['PhysicalResourceId'].should.contain('job-definition/') diff --git a/tests/test_batch/test_server.py b/tests/test_batch/test_server.py new file mode 100644 index 000000000..4a74260a8 --- /dev/null +++ b/tests/test_batch/test_server.py @@ -0,0 +1,19 @@ +from __future__ import unicode_literals + +import sure # noqa + +import moto.server as server +from moto import mock_batch + +''' +Test the different server responses +''' + + +@mock_batch +def test_batch_list(): + backend = server.create_backend_app("batch") + test_client = backend.test_client() + + res = test_client.get('/v1/describecomputeenvironments') + res.status_code.should.equal(200)