Feature: Dockerless Batch (#5100)

This commit is contained in:
Bert Blommers 2022-05-06 09:45:56 +00:00 committed by GitHub
parent c2727a7c20
commit beb05662e4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 619 additions and 60 deletions

View File

@ -12,6 +12,8 @@
batch batch
===== =====
.. autoclass:: moto.batch.models.BatchBackend
|start-h3| Example usage |end-h3| |start-h3| Example usage |end-h3|
.. sourcecode:: python .. sourcecode:: python
@ -28,21 +30,6 @@ batch
- [X] cancel_job - [X] cancel_job
- [X] create_compute_environment - [X] create_compute_environment
- [X] create_job_queue - [X] create_job_queue
Create a job queue
:param queue_name: Queue name
:type queue_name: str
:param priority: Queue priority
:type priority: int
:param state: Queue state
:type state: string
:param compute_env_order: Compute environment list
:type compute_env_order: list of dict
:return: Tuple of Name, ARN
:rtype: tuple of str
- [ ] create_scheduling_policy - [ ] create_scheduling_policy
- [X] delete_compute_environment - [X] delete_compute_environment
- [X] delete_job_queue - [X] delete_job_queue
@ -83,20 +70,5 @@ batch
- [X] untag_resource - [X] untag_resource
- [X] update_compute_environment - [X] update_compute_environment
- [X] update_job_queue - [X] update_job_queue
Update a job queue
:param queue_name: Queue name
:type queue_name: str
:param priority: Queue priority
:type priority: int
:param state: Queue state
:type state: string
:param compute_env_order: Compute environment list
:type compute_env_order: list of dict
:return: Tuple of Name, ARN
:rtype: tuple of str
- [ ] update_scheduling_policy - [ ] update_scheduling_policy

View File

@ -50,6 +50,12 @@ mock_lambda = lazy_load(
".awslambda", "mock_lambda", boto3_name="lambda", backend="lambda_backends" ".awslambda", "mock_lambda", boto3_name="lambda", backend="lambda_backends"
) )
mock_batch = lazy_load(".batch", "mock_batch") mock_batch = lazy_load(".batch", "mock_batch")
mock_batch_simple = lazy_load(
".batch_simple",
"mock_batch_simple",
boto3_name="batch",
backend="batch_simple_backends",
)
mock_budgets = lazy_load(".budgets", "mock_budgets") mock_budgets = lazy_load(".budgets", "mock_budgets")
mock_cloudformation = lazy_load(".cloudformation", "mock_cloudformation") mock_cloudformation = lazy_load(".cloudformation", "mock_cloudformation")
mock_cloudfront = lazy_load(".cloudfront", "mock_cloudfront") mock_cloudfront = lazy_load(".cloudfront", "mock_cloudfront")

View File

@ -5,7 +5,6 @@ import datetime
import time import time
import uuid import uuid
import logging import logging
import docker
import threading import threading
import dateutil.parser import dateutil.parser
from sys import platform from sys import platform
@ -568,6 +567,8 @@ class Job(threading.Thread, BaseModel, DockerModel, ManagedState):
:return: :return:
""" """
try: try:
import docker
self.advance() self.advance()
while self.status == "SUBMITTED": while self.status == "SUBMITTED":
# Wait until we've moved onto state 'PENDING' # Wait until we've moved onto state 'PENDING'
@ -817,6 +818,14 @@ class Job(threading.Thread, BaseModel, DockerModel, ManagedState):
class BatchBackend(BaseBackend): class BatchBackend(BaseBackend):
"""
Batch-jobs are executed inside a Docker-container. Everytime the `submit_job`-method is called, a new Docker container is started.
A job is marked as 'Success' when the Docker-container exits without throwing an error.
Use `@mock_batch_simple` instead if you do not want to use a Docker-container.
With this decorator, jobs are simply marked as 'Success' without trying to execute any commands/scripts.
"""
def __init__(self, region_name=None): def __init__(self, region_name=None):
super().__init__() super().__init__()
self.region_name = region_name self.region_name = region_name
@ -1296,20 +1305,6 @@ class BatchBackend(BaseBackend):
def create_job_queue( def create_job_queue(
self, queue_name, priority, state, compute_env_order, tags=None self, queue_name, priority, state, compute_env_order, tags=None
): ):
"""
Create a job queue
:param queue_name: Queue name
:type queue_name: str
:param priority: Queue priority
:type priority: int
:param state: Queue state
:type state: string
:param compute_env_order: Compute environment list
:type compute_env_order: list of dict
:return: Tuple of Name, ARN
:rtype: tuple of str
"""
for variable, var_name in ( for variable, var_name in (
(queue_name, "jobQueueName"), (queue_name, "jobQueueName"),
(priority, "priority"), (priority, "priority"),
@ -1380,20 +1375,6 @@ class BatchBackend(BaseBackend):
return result return result
def update_job_queue(self, queue_name, priority, state, compute_env_order): def update_job_queue(self, queue_name, priority, state, compute_env_order):
"""
Update a job queue
:param queue_name: Queue name
:type queue_name: str
:param priority: Queue priority
:type priority: int
:param state: Queue state
:type state: string
:param compute_env_order: Compute environment list
:type compute_env_order: list of dict
:return: Tuple of Name, ARN
:rtype: tuple of str
"""
if queue_name is None: if queue_name is None:
raise ClientException("jobQueueName must be provided") raise ClientException("jobQueueName must be provided")

View File

@ -0,0 +1,5 @@
from .models import batch_simple_backends
from ..core.models import base_decorator
batch_backend = batch_simple_backends["us-east-1"]
mock_batch_simple = base_decorator(batch_simple_backends)

View File

@ -0,0 +1,85 @@
from ..batch.models import batch_backends, BaseBackend, Job, ClientException
from ..core.utils import BackendDict
import datetime
class BatchSimpleBackend(BaseBackend):
"""
Implements a Batch-Backend that does not use Docker containers. Submitted Jobs are simply marked as Success
Use the `@mock_batch_simple`-decorator to use this class.
"""
def __init__(self, region_name=None):
self.region_name = region_name
@property
def backend(self):
return batch_backends[self.region_name]
def __getattribute__(self, name):
"""
Magic part that makes this class behave like a wrapper around the regular batch_backend
We intercept calls to `submit_job` and replace this with our own (non-Docker) implementation
Every other method call is send through to batch_backend
"""
if name in [
"backend",
"region_name",
"urls",
"_url_module",
"__class__",
"url_bases",
]:
return object.__getattribute__(self, name)
if name in ["submit_job"]:
def newfunc(*args, **kwargs):
attr = object.__getattribute__(self, name)
return attr(*args, **kwargs)
return newfunc
else:
return object.__getattribute__(self.backend, name)
def submit_job(
self,
job_name,
job_def_id,
job_queue,
depends_on=None,
container_overrides=None,
timeout=None,
):
# Look for job definition
job_def = self.get_job_definition(job_def_id)
if job_def is None:
raise ClientException(
"Job definition {0} does not exist".format(job_def_id)
)
queue = self.get_job_queue(job_queue)
if queue is None:
raise ClientException("Job queue {0} does not exist".format(job_queue))
job = Job(
job_name,
job_def,
queue,
log_backend=self.logs_backend,
container_overrides=container_overrides,
depends_on=depends_on,
all_jobs=self._jobs,
timeout=timeout,
)
self.backend._jobs[job.job_id] = job
# We don't want to actually run the job - just mark it as succeeded
job.job_started_at = datetime.datetime.now()
job._start_attempt()
job._mark_stopped(success=True)
return job_name, job.job_id
batch_simple_backends = BackendDict(BatchSimpleBackend, "batch")

View File

@ -0,0 +1,12 @@
from ..batch.responses import BatchResponse
from .models import batch_simple_backends
class BatchSimpleResponse(BatchResponse):
@property
def batch_backend(self):
"""
:return: Batch Backend
:rtype: moto.batch.models.BatchBackend
"""
return batch_simple_backends[self.region]

View File

@ -0,0 +1,6 @@
from ..batch.urls import url_bases as batch_url_bases
from ..batch.urls import url_paths as batch_url_paths
from .responses import BatchSimpleResponse
url_bases = batch_url_bases.copy()
url_paths = {k: BatchSimpleResponse.dispatch for k in batch_url_paths}

View File

@ -1,4 +1,3 @@
import docker
import functools import functools
import requests.adapters import requests.adapters
@ -17,6 +16,8 @@ class DockerModel:
if self.__docker_client is None: if self.__docker_client is None:
# We should only initiate the Docker Client at runtime. # We should only initiate the Docker Client at runtime.
# The docker.from_env() call will fall if Docker is not running # The docker.from_env() call will fall if Docker is not running
import docker
self.__docker_client = docker.from_env() self.__docker_client = docker.from_env()
# Unfortunately mocking replaces this method w/o fallback enabled, so we # Unfortunately mocking replaces this method w/o fallback enabled, so we

View File

View File

@ -0,0 +1,278 @@
import boto3
import json
import sure # noqa # pylint: disable=unused-import
from moto import mock_iam, mock_ec2, mock_ecs, mock_cloudformation
from moto import mock_batch_simple as mock_batch_without_docker
from uuid import uuid4
# Copy of test_batch/test_batch_cloudformation
# Except that we verify this behaviour still works without docker
DEFAULT_REGION = "eu-central-1"
def _get_clients():
return (
boto3.client("ec2", region_name=DEFAULT_REGION),
boto3.client("iam", region_name=DEFAULT_REGION),
boto3.client("ecs", region_name=DEFAULT_REGION),
boto3.client("logs", region_name=DEFAULT_REGION),
boto3.client("batch", region_name=DEFAULT_REGION),
)
def _setup(ec2_client, iam_client):
"""
Do prerequisite setup
:return: VPC ID, Subnet ID, Security group ID, IAM Role ARN
:rtype: tuple
"""
resp = ec2_client.create_vpc(CidrBlock="172.30.0.0/24")
vpc_id = resp["Vpc"]["VpcId"]
resp = ec2_client.create_subnet(
AvailabilityZone="eu-central-1a", CidrBlock="172.30.0.0/25", VpcId=vpc_id
)
subnet_id = resp["Subnet"]["SubnetId"]
resp = ec2_client.create_security_group(
Description="test_sg_desc", GroupName=str(uuid4())[0:6], VpcId=vpc_id
)
sg_id = resp["GroupId"]
role_name = str(uuid4())[0:6]
resp = iam_client.create_role(
RoleName=role_name, AssumeRolePolicyDocument="some_policy"
)
iam_arn = resp["Role"]["Arn"]
iam_client.create_instance_profile(InstanceProfileName=role_name)
iam_client.add_role_to_instance_profile(
InstanceProfileName=role_name, RoleName=role_name
)
return vpc_id, subnet_id, sg_id, iam_arn
@mock_cloudformation()
@mock_ec2
@mock_ecs
@mock_iam
@mock_batch_without_docker
def test_create_env_cf():
ec2_client, iam_client, _, _, _ = _get_clients()
_, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client)
create_environment_template = {
"Resources": {
"ComputeEnvironment": {
"Type": "AWS::Batch::ComputeEnvironment",
"Properties": {
"Type": "MANAGED",
"ComputeResources": {
"Type": "EC2",
"MinvCpus": 0,
"DesiredvCpus": 0,
"MaxvCpus": 64,
"InstanceTypes": ["optimal"],
"Subnets": [subnet_id],
"SecurityGroupIds": [sg_id],
"InstanceRole": iam_arn.replace("role", "instance-profile"),
},
"ServiceRole": iam_arn,
},
}
}
}
cf_json = json.dumps(create_environment_template)
cf_conn = boto3.client("cloudformation", DEFAULT_REGION)
stack_name = str(uuid4())[0:6]
stack_id = cf_conn.create_stack(StackName=stack_name, TemplateBody=cf_json)[
"StackId"
]
stack_resources = cf_conn.list_stack_resources(StackName=stack_id)
stack_resources["StackResourceSummaries"][0]["ResourceStatus"].should.equal(
"CREATE_COMPLETE"
)
# Spot checks on the ARN
stack_resources["StackResourceSummaries"][0]["PhysicalResourceId"].startswith(
"arn:aws:batch:"
)
stack_resources["StackResourceSummaries"][0]["PhysicalResourceId"].should.contain(
stack_name
)
@mock_cloudformation()
@mock_ec2
@mock_ecs
@mock_iam
@mock_batch_without_docker
def test_create_job_queue_cf():
ec2_client, iam_client, _, _, _ = _get_clients()
_, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client)
create_environment_template = {
"Resources": {
"ComputeEnvironment": {
"Type": "AWS::Batch::ComputeEnvironment",
"Properties": {
"Type": "MANAGED",
"ComputeResources": {
"Type": "EC2",
"MinvCpus": 0,
"DesiredvCpus": 0,
"MaxvCpus": 64,
"InstanceTypes": ["optimal"],
"Subnets": [subnet_id],
"SecurityGroupIds": [sg_id],
"InstanceRole": iam_arn.replace("role", "instance-profile"),
},
"ServiceRole": iam_arn,
},
},
"JobQueue": {
"Type": "AWS::Batch::JobQueue",
"Properties": {
"Priority": 1,
"ComputeEnvironmentOrder": [
{
"Order": 1,
"ComputeEnvironment": {"Ref": "ComputeEnvironment"},
}
],
},
},
}
}
cf_json = json.dumps(create_environment_template)
cf_conn = boto3.client("cloudformation", DEFAULT_REGION)
stack_name = str(uuid4())[0:6]
stack_id = cf_conn.create_stack(StackName=stack_name, TemplateBody=cf_json)[
"StackId"
]
stack_resources = cf_conn.list_stack_resources(StackName=stack_id)
len(stack_resources["StackResourceSummaries"]).should.equal(2)
job_queue_resource = list(
filter(
lambda item: item["ResourceType"] == "AWS::Batch::JobQueue",
stack_resources["StackResourceSummaries"],
)
)[0]
job_queue_resource["ResourceStatus"].should.equal("CREATE_COMPLETE")
# Spot checks on the ARN
job_queue_resource["PhysicalResourceId"].startswith("arn:aws:batch:")
job_queue_resource["PhysicalResourceId"].should.contain(stack_name)
job_queue_resource["PhysicalResourceId"].should.contain("job-queue/")
@mock_cloudformation
@mock_ec2
@mock_ecs
@mock_iam
@mock_batch_without_docker
def test_create_job_def_cf():
ec2_client, iam_client, _, _, _ = _get_clients()
_, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client)
create_environment_template = {
"Resources": {
"ComputeEnvironment": {
"Type": "AWS::Batch::ComputeEnvironment",
"Properties": {
"Type": "MANAGED",
"ComputeResources": {
"Type": "EC2",
"MinvCpus": 0,
"DesiredvCpus": 0,
"MaxvCpus": 64,
"InstanceTypes": ["optimal"],
"Subnets": [subnet_id],
"SecurityGroupIds": [sg_id],
"InstanceRole": iam_arn.replace("role", "instance-profile"),
},
"ServiceRole": iam_arn,
},
},
"JobQueue": {
"Type": "AWS::Batch::JobQueue",
"Properties": {
"Priority": 1,
"ComputeEnvironmentOrder": [
{
"Order": 1,
"ComputeEnvironment": {"Ref": "ComputeEnvironment"},
}
],
},
},
"JobDefinition": {
"Type": "AWS::Batch::JobDefinition",
"Properties": {
"Type": "container",
"ContainerProperties": {
"Image": {
"Fn::Join": [
"",
[
"137112412989.dkr.ecr.",
{"Ref": "AWS::Region"},
".amazonaws.com/amazonlinux:latest",
],
]
},
"ResourceRequirements": [
{"Type": "MEMORY", "Value": 2000},
{"Type": "VCPU", "Value": 2},
],
"Command": ["echo", "Hello world"],
"LinuxParameters": {"Devices": [{"HostPath": "test-path"}]},
},
"RetryStrategy": {"Attempts": 1},
},
},
}
}
cf_json = json.dumps(create_environment_template)
cf_conn = boto3.client("cloudformation", DEFAULT_REGION)
stack_name = str(uuid4())[0:6]
stack_id = cf_conn.create_stack(StackName=stack_name, TemplateBody=cf_json)[
"StackId"
]
stack_resources = cf_conn.list_stack_resources(StackName=stack_id)
len(stack_resources["StackResourceSummaries"]).should.equal(3)
job_def_resource = list(
filter(
lambda item: item["ResourceType"] == "AWS::Batch::JobDefinition",
stack_resources["StackResourceSummaries"],
)
)[0]
job_def_resource["ResourceStatus"].should.equal("CREATE_COMPLETE")
# Spot checks on the ARN
job_def_resource["PhysicalResourceId"].startswith("arn:aws:batch:")
job_def_resource["PhysicalResourceId"].should.contain(f"{stack_name}-JobDef")
job_def_resource["PhysicalResourceId"].should.contain("job-definition/")
# Test the linux parameter device host path
# This ensures that batch is parsing the parameter dictionaries
# correctly by recursively converting the first character of all
# dict keys to lowercase.
batch_conn = boto3.client("batch", DEFAULT_REGION)
response = batch_conn.describe_job_definitions(
jobDefinitions=[job_def_resource["PhysicalResourceId"]]
)
job_def_linux_device_host_path = response.get("jobDefinitions")[0][
"containerProperties"
]["linuxParameters"]["devices"][0]["hostPath"]
job_def_linux_device_host_path.should.equal("test-path")

View File

@ -0,0 +1,101 @@
from ..test_batch import _get_clients, _setup
import sure # noqa # pylint: disable=unused-import
from moto import mock_batch_simple, mock_iam, mock_ec2, mock_ecs, settings
from uuid import uuid4
# Copy of test_batch/test_batch_cloudformation
# Except that we verify this behaviour still works without docker
@mock_ec2
@mock_ecs
@mock_iam
@mock_batch_simple
def test_create_managed_compute_environment():
ec2_client, iam_client, ecs_client, _, batch_client = _get_clients()
_, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client)
compute_name = str(uuid4())
resp = batch_client.create_compute_environment(
computeEnvironmentName=compute_name,
type="MANAGED",
state="ENABLED",
computeResources={
"type": "EC2",
"minvCpus": 5,
"maxvCpus": 10,
"desiredvCpus": 5,
"instanceTypes": ["t2.small", "t2.medium"],
"imageId": "some_image_id",
"subnets": [subnet_id],
"securityGroupIds": [sg_id],
"ec2KeyPair": "string",
"instanceRole": iam_arn.replace("role", "instance-profile"),
"tags": {"string": "string"},
"bidPercentage": 123,
"spotIamFleetRole": "string",
},
serviceRole=iam_arn,
)
resp.should.contain("computeEnvironmentArn")
resp["computeEnvironmentName"].should.equal(compute_name)
our_env = batch_client.describe_compute_environments(
computeEnvironments=[compute_name]
)["computeEnvironments"][0]
# Given a t2.medium is 2 vcpu and t2.small is 1, therefore 2 mediums and 1 small should be created
if not settings.TEST_SERVER_MODE:
# Can't verify this in ServerMode, as other tests may have created instances
resp = ec2_client.describe_instances()
resp.should.contain("Reservations")
len(resp["Reservations"]).should.equal(3)
# Should have created 1 ECS cluster
all_clusters = ecs_client.list_clusters()["clusterArns"]
all_clusters.should.contain(our_env["ecsClusterArn"])
@mock_ec2
@mock_ecs
@mock_iam
@mock_batch_simple
def test_create_managed_compute_environment_with_instance_family():
"""
The InstanceType parameter can have multiple values:
instance_type t2.small
instance_family t2 <-- What we're testing here
'optimal'
unknown value
"""
ec2_client, iam_client, _, _, batch_client = _get_clients()
_, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client)
compute_name = str(uuid4())
batch_client.create_compute_environment(
computeEnvironmentName=compute_name,
type="MANAGED",
state="ENABLED",
computeResources={
"type": "EC2",
"minvCpus": 5,
"maxvCpus": 10,
"desiredvCpus": 5,
"instanceTypes": ["t2"],
"imageId": "some_image_id",
"subnets": [subnet_id],
"securityGroupIds": [sg_id],
"ec2KeyPair": "string",
"instanceRole": iam_arn.replace("role", "instance-profile"),
"tags": {"string": "string"},
"bidPercentage": 123,
"spotIamFleetRole": "string",
},
serviceRole=iam_arn,
)
our_env = batch_client.describe_compute_environments(
computeEnvironments=[compute_name]
)["computeEnvironments"][0]
our_env["computeResources"]["instanceTypes"].should.equal(["t2"])

View File

@ -0,0 +1,112 @@
from ..test_batch import _get_clients, _setup
import sure # noqa # pylint: disable=unused-import
from moto import mock_iam, mock_ec2, mock_ecs, mock_logs
from moto import mock_batch_simple
from uuid import uuid4
# Copy of test_batch/test_batch_jobs
# Except that we verify this behaviour still works without docker
@mock_logs
@mock_ec2
@mock_ecs
@mock_iam
@mock_batch_simple
def test_submit_job_by_name():
ec2_client, iam_client, _, _, batch_client = _get_clients()
_, _, _, iam_arn = _setup(ec2_client, iam_client)
compute_name = str(uuid4())
resp = batch_client.create_compute_environment(
computeEnvironmentName=compute_name,
type="UNMANAGED",
state="ENABLED",
serviceRole=iam_arn,
)
arn = resp["computeEnvironmentArn"]
resp = batch_client.create_job_queue(
jobQueueName=str(uuid4()),
state="ENABLED",
priority=123,
computeEnvironmentOrder=[{"order": 123, "computeEnvironment": arn}],
)
queue_arn = resp["jobQueueArn"]
job_definition_name = f"sleep10_{str(uuid4())[0:6]}"
resp = batch_client.register_job_definition(
jobDefinitionName=job_definition_name,
type="container",
containerProperties={
"image": "busybox",
"vcpus": 1,
"memory": 512,
"command": ["sleep", "10"],
},
)
job_definition_arn = resp["jobDefinitionArn"]
resp = batch_client.submit_job(
jobName="test1", jobQueue=queue_arn, jobDefinition=job_definition_name
)
job_id = resp["jobId"]
resp_jobs = batch_client.describe_jobs(jobs=[job_id])
len(resp_jobs["jobs"]).should.equal(1)
job = resp_jobs["jobs"][0]
job["jobId"].should.equal(job_id)
job["jobQueue"].should.equal(queue_arn)
job["jobDefinition"].should.equal(job_definition_arn)
job["status"].should.equal("SUCCEEDED")
@mock_batch_simple
def test_update_job_definition():
_, _, _, _, batch_client = _get_clients()
tags = [
{"Foo1": "bar1", "Baz1": "buzz1"},
{"Foo2": "bar2", "Baz2": "buzz2"},
]
container_props = {
"image": "amazonlinux",
"memory": 1024,
"vcpus": 2,
}
job_def_name = str(uuid4())[0:6]
batch_client.register_job_definition(
jobDefinitionName=job_def_name,
type="container",
tags=tags[0],
parameters={},
containerProperties=container_props,
)
container_props["memory"] = 2048
batch_client.register_job_definition(
jobDefinitionName=job_def_name,
type="container",
tags=tags[1],
parameters={},
containerProperties=container_props,
)
job_defs = batch_client.describe_job_definitions(jobDefinitionName=job_def_name)[
"jobDefinitions"
]
job_defs.should.have.length_of(2)
job_defs[0]["containerProperties"]["memory"].should.equal(1024)
job_defs[0]["tags"].should.equal(tags[0])
job_defs[0].shouldnt.have.key("timeout")
job_defs[1]["containerProperties"]["memory"].should.equal(2048)
job_defs[1]["tags"].should.equal(tags[1])