BatchSimple: jobs can fail or succeed (#6205)

This commit is contained in:
rafcio19 2023-04-13 20:17:49 +01:00 committed by GitHub
parent ad20320d41
commit de76cc5ab0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 167 additions and 45 deletions

View File

@ -0,0 +1,7 @@
# batch_simple
Batch jobs run under `batch_simple` will succeed by default. To make the jobs fail:
1. set environment variable `MOTO_SIMPLE_BATCH_FAIL_AFTER=0` OR
2. set environment variable `MOTO_SIMPLE_BATCH_FAIL_AFTER` to an integer value
to make it fail after that number of seconds, ie `MOTO_SIMPLE_BATCH_FAIL_AFTER=4` will fail after 4 seconds

View File

@ -8,12 +8,16 @@ from ..batch.models import (
from ..core import BackendDict
import datetime
from os import getenv
from time import sleep
from typing import Any, Dict, List, Tuple, Optional
class BatchSimpleBackend(BaseBackend):
"""
Implements a Batch-Backend that does not use Docker containers. Submitted Jobs are simply marked as Success
Implements a Batch-Backend that does not use Docker containers. Submitted Jobs are marked as Success or Fail,
depending on the presence and value of MOTO_SIMPLE_BATCH_FAIL_AFTER env var
Annotate your tests with `@mock_batch_simple`-decorator to use this Batch-implementation.
"""
@ -77,11 +81,27 @@ class BatchSimpleBackend(BaseBackend):
)
self.backend._jobs[job.job_id] = job
# We don't want to actually run the job - just mark it as succeeded
job.job_started_at = datetime.datetime.now()
job.log_stream_name = job._stream_name
job._start_attempt()
job._mark_stopped(success=True)
# We don't want to actually run the job - just mark it as succeeded or failed
# depending on whether env var MOTO_SIMPLE_BATCH_FAIL_AFTER is set
# if MOTO_SIMPLE_BATCH_FAIL_AFTER is set to an integer then batch will
# sleep this many seconds
should_batch_fail = getenv("MOTO_SIMPLE_BATCH_FAIL_AFTER")
if should_batch_fail:
try:
batch_fail_delay = int(should_batch_fail)
sleep(batch_fail_delay)
except ValueError:
# Unable to parse value of MOTO_SIMPLE_BATCH_FAIL_AFTER as an integer
pass
# fail the job
job._mark_stopped(success=False)
else:
job._mark_stopped(success=True)
return job_name, job.job_id

View File

@ -1,10 +1,10 @@
from ..test_batch import _get_clients, _setup
import sure # noqa # pylint: disable=unused-import
import os
from moto import mock_iam, mock_ec2, mock_ecs, mock_logs, settings
from moto import mock_batch_simple
from uuid import uuid4
from unittest import SkipTest
from unittest import mock, SkipTest
# Copy of test_batch/test_batch_jobs
@ -17,42 +17,10 @@ from unittest import SkipTest
@mock_iam
@mock_batch_simple
def test_submit_job_by_name():
if settings.TEST_SERVER_MODE:
raise SkipTest("No point in testing batch_simple in ServerMode")
ec2_client, iam_client, _, _, batch_client = _get_clients()
_, _, _, iam_arn = _setup(ec2_client, iam_client)
compute_name = str(uuid4())
resp = batch_client.create_compute_environment(
computeEnvironmentName=compute_name,
type="UNMANAGED",
state="ENABLED",
serviceRole=iam_arn,
)
arn = resp["computeEnvironmentArn"]
resp = batch_client.create_job_queue(
jobQueueName=str(uuid4()),
state="ENABLED",
priority=123,
computeEnvironmentOrder=[{"order": 123, "computeEnvironment": arn}],
)
queue_arn = resp["jobQueueArn"]
job_definition_name = f"sleep10_{str(uuid4())[0:6]}"
resp = batch_client.register_job_definition(
jobDefinitionName=job_definition_name,
type="container",
containerProperties={
"image": "busybox",
"vcpus": 1,
"memory": 512,
"command": ["sleep", "10"],
},
batch_client, job_definition_arn, queue_arn = setup_common_batch_simple(
job_definition_name
)
job_definition_arn = resp["jobDefinitionArn"]
resp = batch_client.submit_job(
jobName="test1", jobQueue=queue_arn, jobDefinition=job_definition_name
@ -109,11 +77,138 @@ def test_update_job_definition():
job_defs = batch_client.describe_job_definitions(jobDefinitionName=job_def_name)[
"jobDefinitions"
]
job_defs.should.have.length_of(2)
assert len(job_defs) == 2
job_defs[0]["containerProperties"]["memory"].should.equal(1024)
job_defs[0]["tags"].should.equal(tags[0])
job_defs[0].shouldnt.have.key("timeout")
assert job_defs[0]["containerProperties"]["memory"] == 1024
assert job_defs[0]["tags"] == tags[0]
assert "timeout" not in job_defs[0]
job_defs[1]["containerProperties"]["memory"].should.equal(2048)
job_defs[1]["tags"].should.equal(tags[1])
assert job_defs[1]["containerProperties"]["memory"] == 2048
assert job_defs[1]["tags"] == tags[1]
@mock_logs
@mock_ec2
@mock_ecs
@mock_iam
@mock_batch_simple
def test_submit_job_fail():
job_definition_name = "test_job_moto_fail"
with mock.patch.dict(os.environ, {"MOTO_SIMPLE_BATCH_FAIL_AFTER": "0"}):
batch_client, _, queue_arn = setup_common_batch_simple(job_definition_name)
resp = batch_client.submit_job(
jobName=job_definition_name,
jobQueue=queue_arn,
jobDefinition=job_definition_name,
)
job_id = resp["jobId"]
resp_jobs = batch_client.describe_jobs(jobs=[job_id])
assert len(resp_jobs["jobs"]) == 1
job = resp_jobs["jobs"][0]
assert job["jobId"] == job_id
assert job["status"] == "FAILED"
@mock_logs
@mock_ec2
@mock_ecs
@mock_iam
@mock_batch_simple
def test_submit_job_fail_after_1_secs():
job_definition_name = "test_job_moto_fail"
with mock.patch.dict(os.environ, {"MOTO_SIMPLE_BATCH_FAIL_AFTER": "1"}):
batch_client, _, queue_arn = setup_common_batch_simple(job_definition_name)
resp = batch_client.submit_job(
jobName=job_definition_name,
jobQueue=queue_arn,
jobDefinition=job_definition_name,
)
job_id = resp["jobId"]
resp_jobs = batch_client.describe_jobs(jobs=[job_id])
assert len(resp_jobs["jobs"]) == 1
job = resp_jobs["jobs"][0]
assert job["jobId"] == job_id
assert job["status"] == "FAILED"
@mock_logs
@mock_ec2
@mock_ecs
@mock_iam
@mock_batch_simple
def test_submit_job_fail_bad_int():
job_definition_name = "test_job_moto_fail"
with mock.patch.dict(
os.environ, {"MOTO_SIMPLE_BATCH_FAIL_AFTER": "CANT_PARSE_AS_INT"}
):
batch_client, _, queue_arn = setup_common_batch_simple(job_definition_name)
resp = batch_client.submit_job(
jobName=job_definition_name,
jobQueue=queue_arn,
jobDefinition=job_definition_name,
)
job_id = resp["jobId"]
resp_jobs = batch_client.describe_jobs(jobs=[job_id])
assert len(resp_jobs["jobs"]) == 1
job = resp_jobs["jobs"][0]
assert job["jobId"] == job_id
assert job["status"] == "FAILED"
@mock_logs
@mock_ec2
@mock_ecs
@mock_iam
@mock_batch_simple
def setup_common_batch_simple(job_definition_name):
if settings.TEST_SERVER_MODE:
raise SkipTest("No point in testing batch_simple in ServerMode")
ec2_client, iam_client, _, _, batch_client = _get_clients()
_, _, _, iam_arn = _setup(ec2_client, iam_client)
compute_name = str(uuid4())
resp = batch_client.create_compute_environment(
computeEnvironmentName=compute_name,
type="UNMANAGED",
state="ENABLED",
serviceRole=iam_arn,
)
arn = resp["computeEnvironmentArn"]
resp = batch_client.create_job_queue(
jobQueueName=str(uuid4()),
state="ENABLED",
priority=123,
computeEnvironmentOrder=[{"order": 123, "computeEnvironment": arn}],
)
queue_arn = resp["jobQueueArn"]
resp = batch_client.register_job_definition(
jobDefinitionName=job_definition_name,
type="container",
containerProperties={
"image": "busybox",
"vcpus": 1,
"memory": 512,
"command": ["sleep", "10"],
},
)
job_definition_arn = resp["jobDefinitionArn"]
return batch_client, job_definition_arn, queue_arn