2021-08-04 12:40:10 +00:00
|
|
|
from . import _get_clients, _setup
|
2017-09-26 16:37:26 +00:00
|
|
|
|
2017-10-06 00:21:29 +00:00
|
|
|
import datetime
|
2017-09-26 16:37:26 +00:00
|
|
|
import sure # noqa
|
2017-10-06 00:21:29 +00:00
|
|
|
from moto import mock_batch, mock_iam, mock_ec2, mock_ecs, mock_logs
|
2020-10-06 06:04:09 +00:00
|
|
|
import pytest
|
2021-08-04 12:40:10 +00:00
|
|
|
import time
|
2021-10-05 17:11:07 +00:00
|
|
|
from uuid import uuid4
|
2019-12-13 05:07:29 +00:00
|
|
|
|
2017-10-06 00:21:29 +00:00
|
|
|
|
2019-08-06 20:13:52 +00:00
|
|
|
@mock_logs
|
|
|
|
@mock_ec2
|
|
|
|
@mock_ecs
|
|
|
|
@mock_iam
|
|
|
|
@mock_batch
|
|
|
|
def test_submit_job_by_name():
|
|
|
|
ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients()
|
|
|
|
vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client)
|
|
|
|
|
2021-10-05 17:11:07 +00:00
|
|
|
compute_name = str(uuid4())
|
2019-08-06 20:13:52 +00:00
|
|
|
resp = batch_client.create_compute_environment(
|
|
|
|
computeEnvironmentName=compute_name,
|
2019-10-31 15:44:26 +00:00
|
|
|
type="UNMANAGED",
|
|
|
|
state="ENABLED",
|
|
|
|
serviceRole=iam_arn,
|
2019-08-06 20:13:52 +00:00
|
|
|
)
|
2019-10-31 15:44:26 +00:00
|
|
|
arn = resp["computeEnvironmentArn"]
|
2019-08-06 20:13:52 +00:00
|
|
|
|
|
|
|
resp = batch_client.create_job_queue(
|
2021-10-05 17:11:07 +00:00
|
|
|
jobQueueName=str(uuid4()),
|
2019-10-31 15:44:26 +00:00
|
|
|
state="ENABLED",
|
2019-08-06 20:13:52 +00:00
|
|
|
priority=123,
|
2019-10-31 15:44:26 +00:00
|
|
|
computeEnvironmentOrder=[{"order": 123, "computeEnvironment": arn}],
|
2019-08-06 20:13:52 +00:00
|
|
|
)
|
2019-10-31 15:44:26 +00:00
|
|
|
queue_arn = resp["jobQueueArn"]
|
2019-08-06 20:13:52 +00:00
|
|
|
|
2021-10-05 17:11:07 +00:00
|
|
|
job_definition_name = f"sleep10_{str(uuid4())[0:6]}"
|
2019-08-06 20:13:52 +00:00
|
|
|
|
|
|
|
batch_client.register_job_definition(
|
|
|
|
jobDefinitionName=job_definition_name,
|
2019-10-31 15:44:26 +00:00
|
|
|
type="container",
|
2019-08-06 20:13:52 +00:00
|
|
|
containerProperties={
|
2019-10-31 15:44:26 +00:00
|
|
|
"image": "busybox",
|
|
|
|
"vcpus": 1,
|
|
|
|
"memory": 128,
|
|
|
|
"command": ["sleep", "10"],
|
|
|
|
},
|
2019-08-06 20:13:52 +00:00
|
|
|
)
|
|
|
|
batch_client.register_job_definition(
|
|
|
|
jobDefinitionName=job_definition_name,
|
2019-10-31 15:44:26 +00:00
|
|
|
type="container",
|
2019-08-06 20:13:52 +00:00
|
|
|
containerProperties={
|
2019-10-31 15:44:26 +00:00
|
|
|
"image": "busybox",
|
|
|
|
"vcpus": 1,
|
|
|
|
"memory": 256,
|
|
|
|
"command": ["sleep", "10"],
|
|
|
|
},
|
2019-08-06 20:13:52 +00:00
|
|
|
)
|
|
|
|
resp = batch_client.register_job_definition(
|
|
|
|
jobDefinitionName=job_definition_name,
|
2019-10-31 15:44:26 +00:00
|
|
|
type="container",
|
2019-08-06 20:13:52 +00:00
|
|
|
containerProperties={
|
2019-10-31 15:44:26 +00:00
|
|
|
"image": "busybox",
|
|
|
|
"vcpus": 1,
|
|
|
|
"memory": 512,
|
|
|
|
"command": ["sleep", "10"],
|
|
|
|
},
|
2019-08-06 20:13:52 +00:00
|
|
|
)
|
2019-10-31 15:44:26 +00:00
|
|
|
job_definition_arn = resp["jobDefinitionArn"]
|
2019-08-06 20:13:52 +00:00
|
|
|
|
|
|
|
resp = batch_client.submit_job(
|
2019-10-31 15:44:26 +00:00
|
|
|
jobName="test1", jobQueue=queue_arn, jobDefinition=job_definition_name
|
2019-08-06 20:13:52 +00:00
|
|
|
)
|
2019-10-31 15:44:26 +00:00
|
|
|
job_id = resp["jobId"]
|
2019-08-06 20:13:52 +00:00
|
|
|
|
|
|
|
resp_jobs = batch_client.describe_jobs(jobs=[job_id])
|
|
|
|
|
|
|
|
# batch_client.terminate_job(jobId=job_id)
|
|
|
|
|
2019-10-31 15:44:26 +00:00
|
|
|
len(resp_jobs["jobs"]).should.equal(1)
|
|
|
|
resp_jobs["jobs"][0]["jobId"].should.equal(job_id)
|
|
|
|
resp_jobs["jobs"][0]["jobQueue"].should.equal(queue_arn)
|
|
|
|
resp_jobs["jobs"][0]["jobDefinition"].should.equal(job_definition_arn)
|
|
|
|
|
2019-08-06 20:13:52 +00:00
|
|
|
|
2017-10-11 22:46:27 +00:00
|
|
|
# SLOW TESTS
|
2019-05-15 15:04:31 +00:00
|
|
|
|
2020-03-12 14:07:34 +00:00
|
|
|
|
2017-10-06 00:21:29 +00:00
|
|
|
@mock_logs
|
|
|
|
@mock_ec2
|
|
|
|
@mock_ecs
|
|
|
|
@mock_iam
|
|
|
|
@mock_batch
|
2020-10-06 06:04:09 +00:00
|
|
|
@pytest.mark.network
|
2017-10-06 00:21:29 +00:00
|
|
|
def test_submit_job():
|
|
|
|
ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients()
|
|
|
|
vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client)
|
|
|
|
|
2021-10-05 17:11:07 +00:00
|
|
|
job_def_name = str(uuid4())[0:6]
|
2021-08-22 11:29:23 +00:00
|
|
|
commands = ["echo", "hello"]
|
|
|
|
job_def_arn, queue_arn = prepare_job(batch_client, commands, iam_arn, job_def_name)
|
2017-10-06 00:21:29 +00:00
|
|
|
|
|
|
|
resp = batch_client.submit_job(
|
2021-10-05 17:11:07 +00:00
|
|
|
jobName=str(uuid4())[0:6], jobQueue=queue_arn, jobDefinition=job_def_arn
|
2017-10-06 00:21:29 +00:00
|
|
|
)
|
2019-10-31 15:44:26 +00:00
|
|
|
job_id = resp["jobId"]
|
2017-10-06 00:21:29 +00:00
|
|
|
|
Fix Race Condition in batch:SubmitJob (#3480)
* Extract Duplicate Code into Helper Method
DRY up the tests and replace the arbitrary `sleep()` calls with a more
explicit check before progressing.
* Improve Testing of batch:TerminateJob
The test now confirms that the job was terminated by sandwiching a `sleep`
command between two `echo` commands. In addition to the original checks
of the terminated job status/reason, the test now asserts that only the
first echo command succeeded, confirming that the job was indeed terminated
while in progress.
* Fix Race Condition in batch:SubmitJob
The `test_submit_job` in `test_batch.py` kicks off a job, calls `describe_jobs`
in a loop until the job status returned is SUCCEEDED, and then asserts against
the logged events.
The backend code that runs the submitted job does so in a separate thread. If
the job was successful, the job status was being set to SUCCEEDED *before* the
event logs had been written to the logging backend.
As a result, it was possible for the primary thread running the test to detect
that the job was successful immediately after the secondary thread had updated
the job status but before the secondary thread had written the logs to the
logging backend. Under the right conditions, this could cause the subsequent
logging assertions in the primary thread to fail.
Additionally, the code that collected the logs from the container was using
a "dodgy hack" of time.sleep() and a modulo-based conditional that was
ultimately non-deterministic and could result in log messages being dropped
or duplicated in certain scenarios.
In order to address these issues, this commit does the following:
* Carefully re-orders any code that sets a job status or timestamp
to avoid any obvious race conditions.
* Removes the "dodgy hack" in favor of a much more straightforward
(and less error-prone) method of collecting logs from the container.
* Removes arbitrary and unnecessary calls to time.sleep()
Before applying any changes, the flaky test was failing about 12% of the
time. Putting a sleep() call between setting the `job_status` to SUCCEEDED
and collecting the logs, resulted in a 100% failure rate. Simply moving
the code that sets the job status to SUCCEEDED to the end of the code block,
dropped the failure rate to ~2%. Finally, removing the log collection
hack allowed the test suite to run ~1000 times without a single failure.
Taken in aggregate, these changes make the batch backend more deterministic
and should put the nail in the coffin of this flaky test.
Closes #3475
2020-11-18 10:49:25 +00:00
|
|
|
_wait_for_job_status(batch_client, job_id, "SUCCEEDED")
|
2017-10-06 00:21:29 +00:00
|
|
|
|
2020-11-11 15:55:37 +00:00
|
|
|
resp = logs_client.describe_log_streams(
|
2021-08-22 11:29:23 +00:00
|
|
|
logGroupName="/aws/batch/job", logStreamNamePrefix=job_def_name
|
2020-11-11 15:55:37 +00:00
|
|
|
)
|
2021-08-04 12:40:10 +00:00
|
|
|
resp["logStreams"].should.have.length_of(1)
|
2019-10-31 15:44:26 +00:00
|
|
|
ls_name = resp["logStreams"][0]["logStreamName"]
|
2017-10-06 00:21:29 +00:00
|
|
|
|
2019-10-31 15:44:26 +00:00
|
|
|
resp = logs_client.get_log_events(
|
|
|
|
logGroupName="/aws/batch/job", logStreamName=ls_name
|
|
|
|
)
|
2020-03-12 14:07:34 +00:00
|
|
|
[event["message"] for event in resp["events"]].should.equal(["hello"])
|
2017-10-11 22:46:27 +00:00
|
|
|
|
|
|
|
|
|
|
|
@mock_logs
|
|
|
|
@mock_ec2
|
|
|
|
@mock_ecs
|
|
|
|
@mock_iam
|
|
|
|
@mock_batch
|
2020-10-06 06:04:09 +00:00
|
|
|
@pytest.mark.network
|
2017-10-11 22:46:27 +00:00
|
|
|
def test_list_jobs():
|
|
|
|
ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients()
|
|
|
|
vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client)
|
|
|
|
|
2021-08-22 11:29:23 +00:00
|
|
|
job_def_name = "sleep5"
|
|
|
|
commands = ["sleep", "5"]
|
|
|
|
job_def_arn, queue_arn = prepare_job(batch_client, commands, iam_arn, job_def_name)
|
2017-10-11 22:46:27 +00:00
|
|
|
|
|
|
|
resp = batch_client.submit_job(
|
2019-10-31 15:44:26 +00:00
|
|
|
jobName="test1", jobQueue=queue_arn, jobDefinition=job_def_arn
|
2017-10-11 22:46:27 +00:00
|
|
|
)
|
2019-10-31 15:44:26 +00:00
|
|
|
job_id1 = resp["jobId"]
|
2017-10-11 22:46:27 +00:00
|
|
|
resp = batch_client.submit_job(
|
2019-10-31 15:44:26 +00:00
|
|
|
jobName="test2", jobQueue=queue_arn, jobDefinition=job_def_arn
|
2017-10-11 22:46:27 +00:00
|
|
|
)
|
2019-10-31 15:44:26 +00:00
|
|
|
job_id2 = resp["jobId"]
|
2017-10-11 22:46:27 +00:00
|
|
|
|
2021-08-04 12:40:10 +00:00
|
|
|
batch_client.list_jobs(jobQueue=queue_arn, jobStatus="SUCCEEDED")[
|
|
|
|
"jobSummaryList"
|
|
|
|
].should.have.length_of(0)
|
2017-10-11 22:46:27 +00:00
|
|
|
|
|
|
|
# Wait only as long as it takes to run the jobs
|
Fix Race Condition in batch:SubmitJob (#3480)
* Extract Duplicate Code into Helper Method
DRY up the tests and replace the arbitrary `sleep()` calls with a more
explicit check before progressing.
* Improve Testing of batch:TerminateJob
The test now confirms that the job was terminated by sandwiching a `sleep`
command between two `echo` commands. In addition to the original checks
of the terminated job status/reason, the test now asserts that only the
first echo command succeeded, confirming that the job was indeed terminated
while in progress.
* Fix Race Condition in batch:SubmitJob
The `test_submit_job` in `test_batch.py` kicks off a job, calls `describe_jobs`
in a loop until the job status returned is SUCCEEDED, and then asserts against
the logged events.
The backend code that runs the submitted job does so in a separate thread. If
the job was successful, the job status was being set to SUCCEEDED *before* the
event logs had been written to the logging backend.
As a result, it was possible for the primary thread running the test to detect
that the job was successful immediately after the secondary thread had updated
the job status but before the secondary thread had written the logs to the
logging backend. Under the right conditions, this could cause the subsequent
logging assertions in the primary thread to fail.
Additionally, the code that collected the logs from the container was using
a "dodgy hack" of time.sleep() and a modulo-based conditional that was
ultimately non-deterministic and could result in log messages being dropped
or duplicated in certain scenarios.
In order to address these issues, this commit does the following:
* Carefully re-orders any code that sets a job status or timestamp
to avoid any obvious race conditions.
* Removes the "dodgy hack" in favor of a much more straightforward
(and less error-prone) method of collecting logs from the container.
* Removes arbitrary and unnecessary calls to time.sleep()
Before applying any changes, the flaky test was failing about 12% of the
time. Putting a sleep() call between setting the `job_status` to SUCCEEDED
and collecting the logs, resulted in a 100% failure rate. Simply moving
the code that sets the job status to SUCCEEDED to the end of the code block,
dropped the failure rate to ~2%. Finally, removing the log collection
hack allowed the test suite to run ~1000 times without a single failure.
Taken in aggregate, these changes make the batch backend more deterministic
and should put the nail in the coffin of this flaky test.
Closes #3475
2020-11-18 10:49:25 +00:00
|
|
|
for job_id in [job_id1, job_id2]:
|
|
|
|
_wait_for_job_status(batch_client, job_id, "SUCCEEDED")
|
2017-10-11 22:46:27 +00:00
|
|
|
|
2021-08-04 12:40:10 +00:00
|
|
|
batch_client.list_jobs(jobQueue=queue_arn, jobStatus="SUCCEEDED")[
|
|
|
|
"jobSummaryList"
|
|
|
|
].should.have.length_of(2)
|
2017-10-11 22:46:27 +00:00
|
|
|
|
|
|
|
|
|
|
|
@mock_logs
|
|
|
|
@mock_ec2
|
|
|
|
@mock_ecs
|
|
|
|
@mock_iam
|
|
|
|
@mock_batch
|
|
|
|
def test_terminate_job():
|
|
|
|
ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients()
|
|
|
|
vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client)
|
|
|
|
|
2021-10-05 17:11:07 +00:00
|
|
|
job_def_name = f"echo-sleep-echo-{str(uuid4())[0:6]}"
|
2021-08-22 11:29:23 +00:00
|
|
|
commands = ["sh", "-c", "echo start && sleep 30 && echo stop"]
|
|
|
|
job_def_arn, queue_arn = prepare_job(batch_client, commands, iam_arn, job_def_name)
|
2017-10-11 22:46:27 +00:00
|
|
|
|
|
|
|
resp = batch_client.submit_job(
|
2019-10-31 15:44:26 +00:00
|
|
|
jobName="test1", jobQueue=queue_arn, jobDefinition=job_def_arn
|
2017-10-11 22:46:27 +00:00
|
|
|
)
|
2019-10-31 15:44:26 +00:00
|
|
|
job_id = resp["jobId"]
|
2017-10-11 22:46:27 +00:00
|
|
|
|
Fix Race Condition in batch:SubmitJob (#3480)
* Extract Duplicate Code into Helper Method
DRY up the tests and replace the arbitrary `sleep()` calls with a more
explicit check before progressing.
* Improve Testing of batch:TerminateJob
The test now confirms that the job was terminated by sandwiching a `sleep`
command between two `echo` commands. In addition to the original checks
of the terminated job status/reason, the test now asserts that only the
first echo command succeeded, confirming that the job was indeed terminated
while in progress.
* Fix Race Condition in batch:SubmitJob
The `test_submit_job` in `test_batch.py` kicks off a job, calls `describe_jobs`
in a loop until the job status returned is SUCCEEDED, and then asserts against
the logged events.
The backend code that runs the submitted job does so in a separate thread. If
the job was successful, the job status was being set to SUCCEEDED *before* the
event logs had been written to the logging backend.
As a result, it was possible for the primary thread running the test to detect
that the job was successful immediately after the secondary thread had updated
the job status but before the secondary thread had written the logs to the
logging backend. Under the right conditions, this could cause the subsequent
logging assertions in the primary thread to fail.
Additionally, the code that collected the logs from the container was using
a "dodgy hack" of time.sleep() and a modulo-based conditional that was
ultimately non-deterministic and could result in log messages being dropped
or duplicated in certain scenarios.
In order to address these issues, this commit does the following:
* Carefully re-orders any code that sets a job status or timestamp
to avoid any obvious race conditions.
* Removes the "dodgy hack" in favor of a much more straightforward
(and less error-prone) method of collecting logs from the container.
* Removes arbitrary and unnecessary calls to time.sleep()
Before applying any changes, the flaky test was failing about 12% of the
time. Putting a sleep() call between setting the `job_status` to SUCCEEDED
and collecting the logs, resulted in a 100% failure rate. Simply moving
the code that sets the job status to SUCCEEDED to the end of the code block,
dropped the failure rate to ~2%. Finally, removing the log collection
hack allowed the test suite to run ~1000 times without a single failure.
Taken in aggregate, these changes make the batch backend more deterministic
and should put the nail in the coffin of this flaky test.
Closes #3475
2020-11-18 10:49:25 +00:00
|
|
|
_wait_for_job_status(batch_client, job_id, "RUNNING")
|
2017-10-11 22:46:27 +00:00
|
|
|
|
2019-10-31 15:44:26 +00:00
|
|
|
batch_client.terminate_job(jobId=job_id, reason="test_terminate")
|
2017-10-11 22:46:27 +00:00
|
|
|
|
Fix Race Condition in batch:SubmitJob (#3480)
* Extract Duplicate Code into Helper Method
DRY up the tests and replace the arbitrary `sleep()` calls with a more
explicit check before progressing.
* Improve Testing of batch:TerminateJob
The test now confirms that the job was terminated by sandwiching a `sleep`
command between two `echo` commands. In addition to the original checks
of the terminated job status/reason, the test now asserts that only the
first echo command succeeded, confirming that the job was indeed terminated
while in progress.
* Fix Race Condition in batch:SubmitJob
The `test_submit_job` in `test_batch.py` kicks off a job, calls `describe_jobs`
in a loop until the job status returned is SUCCEEDED, and then asserts against
the logged events.
The backend code that runs the submitted job does so in a separate thread. If
the job was successful, the job status was being set to SUCCEEDED *before* the
event logs had been written to the logging backend.
As a result, it was possible for the primary thread running the test to detect
that the job was successful immediately after the secondary thread had updated
the job status but before the secondary thread had written the logs to the
logging backend. Under the right conditions, this could cause the subsequent
logging assertions in the primary thread to fail.
Additionally, the code that collected the logs from the container was using
a "dodgy hack" of time.sleep() and a modulo-based conditional that was
ultimately non-deterministic and could result in log messages being dropped
or duplicated in certain scenarios.
In order to address these issues, this commit does the following:
* Carefully re-orders any code that sets a job status or timestamp
to avoid any obvious race conditions.
* Removes the "dodgy hack" in favor of a much more straightforward
(and less error-prone) method of collecting logs from the container.
* Removes arbitrary and unnecessary calls to time.sleep()
Before applying any changes, the flaky test was failing about 12% of the
time. Putting a sleep() call between setting the `job_status` to SUCCEEDED
and collecting the logs, resulted in a 100% failure rate. Simply moving
the code that sets the job status to SUCCEEDED to the end of the code block,
dropped the failure rate to ~2%. Finally, removing the log collection
hack allowed the test suite to run ~1000 times without a single failure.
Taken in aggregate, these changes make the batch backend more deterministic
and should put the nail in the coffin of this flaky test.
Closes #3475
2020-11-18 10:49:25 +00:00
|
|
|
_wait_for_job_status(batch_client, job_id, "FAILED")
|
2017-10-11 22:46:27 +00:00
|
|
|
|
|
|
|
resp = batch_client.describe_jobs(jobs=[job_id])
|
2019-10-31 15:44:26 +00:00
|
|
|
resp["jobs"][0]["jobName"].should.equal("test1")
|
|
|
|
resp["jobs"][0]["status"].should.equal("FAILED")
|
|
|
|
resp["jobs"][0]["statusReason"].should.equal("test_terminate")
|
Fix Race Condition in batch:SubmitJob (#3480)
* Extract Duplicate Code into Helper Method
DRY up the tests and replace the arbitrary `sleep()` calls with a more
explicit check before progressing.
* Improve Testing of batch:TerminateJob
The test now confirms that the job was terminated by sandwiching a `sleep`
command between two `echo` commands. In addition to the original checks
of the terminated job status/reason, the test now asserts that only the
first echo command succeeded, confirming that the job was indeed terminated
while in progress.
* Fix Race Condition in batch:SubmitJob
The `test_submit_job` in `test_batch.py` kicks off a job, calls `describe_jobs`
in a loop until the job status returned is SUCCEEDED, and then asserts against
the logged events.
The backend code that runs the submitted job does so in a separate thread. If
the job was successful, the job status was being set to SUCCEEDED *before* the
event logs had been written to the logging backend.
As a result, it was possible for the primary thread running the test to detect
that the job was successful immediately after the secondary thread had updated
the job status but before the secondary thread had written the logs to the
logging backend. Under the right conditions, this could cause the subsequent
logging assertions in the primary thread to fail.
Additionally, the code that collected the logs from the container was using
a "dodgy hack" of time.sleep() and a modulo-based conditional that was
ultimately non-deterministic and could result in log messages being dropped
or duplicated in certain scenarios.
In order to address these issues, this commit does the following:
* Carefully re-orders any code that sets a job status or timestamp
to avoid any obvious race conditions.
* Removes the "dodgy hack" in favor of a much more straightforward
(and less error-prone) method of collecting logs from the container.
* Removes arbitrary and unnecessary calls to time.sleep()
Before applying any changes, the flaky test was failing about 12% of the
time. Putting a sleep() call between setting the `job_status` to SUCCEEDED
and collecting the logs, resulted in a 100% failure rate. Simply moving
the code that sets the job status to SUCCEEDED to the end of the code block,
dropped the failure rate to ~2%. Finally, removing the log collection
hack allowed the test suite to run ~1000 times without a single failure.
Taken in aggregate, these changes make the batch backend more deterministic
and should put the nail in the coffin of this flaky test.
Closes #3475
2020-11-18 10:49:25 +00:00
|
|
|
|
2021-10-05 17:11:07 +00:00
|
|
|
ls_name = f"{job_def_name}/default/{job_id}"
|
Fix Race Condition in batch:SubmitJob (#3480)
* Extract Duplicate Code into Helper Method
DRY up the tests and replace the arbitrary `sleep()` calls with a more
explicit check before progressing.
* Improve Testing of batch:TerminateJob
The test now confirms that the job was terminated by sandwiching a `sleep`
command between two `echo` commands. In addition to the original checks
of the terminated job status/reason, the test now asserts that only the
first echo command succeeded, confirming that the job was indeed terminated
while in progress.
* Fix Race Condition in batch:SubmitJob
The `test_submit_job` in `test_batch.py` kicks off a job, calls `describe_jobs`
in a loop until the job status returned is SUCCEEDED, and then asserts against
the logged events.
The backend code that runs the submitted job does so in a separate thread. If
the job was successful, the job status was being set to SUCCEEDED *before* the
event logs had been written to the logging backend.
As a result, it was possible for the primary thread running the test to detect
that the job was successful immediately after the secondary thread had updated
the job status but before the secondary thread had written the logs to the
logging backend. Under the right conditions, this could cause the subsequent
logging assertions in the primary thread to fail.
Additionally, the code that collected the logs from the container was using
a "dodgy hack" of time.sleep() and a modulo-based conditional that was
ultimately non-deterministic and could result in log messages being dropped
or duplicated in certain scenarios.
In order to address these issues, this commit does the following:
* Carefully re-orders any code that sets a job status or timestamp
to avoid any obvious race conditions.
* Removes the "dodgy hack" in favor of a much more straightforward
(and less error-prone) method of collecting logs from the container.
* Removes arbitrary and unnecessary calls to time.sleep()
Before applying any changes, the flaky test was failing about 12% of the
time. Putting a sleep() call between setting the `job_status` to SUCCEEDED
and collecting the logs, resulted in a 100% failure rate. Simply moving
the code that sets the job status to SUCCEEDED to the end of the code block,
dropped the failure rate to ~2%. Finally, removing the log collection
hack allowed the test suite to run ~1000 times without a single failure.
Taken in aggregate, these changes make the batch backend more deterministic
and should put the nail in the coffin of this flaky test.
Closes #3475
2020-11-18 10:49:25 +00:00
|
|
|
|
|
|
|
resp = logs_client.get_log_events(
|
|
|
|
logGroupName="/aws/batch/job", logStreamName=ls_name
|
|
|
|
)
|
|
|
|
# Events should only contain 'start' because we interrupted
|
|
|
|
# the job before 'stop' was written to the logs.
|
|
|
|
resp["events"].should.have.length_of(1)
|
|
|
|
resp["events"][0]["message"].should.equal("start")
|
|
|
|
|
|
|
|
|
2021-08-22 11:29:23 +00:00
|
|
|
@mock_logs
|
|
|
|
@mock_ec2
|
|
|
|
@mock_ecs
|
|
|
|
@mock_iam
|
|
|
|
@mock_batch
|
|
|
|
def test_cancel_pending_job():
|
|
|
|
ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients()
|
|
|
|
vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client)
|
|
|
|
|
|
|
|
# We need to be able to cancel a job that has not been started yet
|
|
|
|
# Locally, our jobs start so fast that we can't cancel them in time
|
|
|
|
# So delay our job, by letting it depend on a slow-running job
|
|
|
|
commands = ["sleep", "1"]
|
|
|
|
job_def_arn, queue_arn = prepare_job(batch_client, commands, iam_arn, "deptest")
|
|
|
|
|
|
|
|
resp = batch_client.submit_job(
|
|
|
|
jobName="test1", jobQueue=queue_arn, jobDefinition=job_def_arn
|
|
|
|
)
|
|
|
|
delayed_job = resp["jobId"]
|
|
|
|
|
|
|
|
depends_on = [{"jobId": delayed_job, "type": "SEQUENTIAL"}]
|
|
|
|
resp = batch_client.submit_job(
|
|
|
|
jobName="test_job_name",
|
|
|
|
jobQueue=queue_arn,
|
|
|
|
jobDefinition=job_def_arn,
|
|
|
|
dependsOn=depends_on,
|
|
|
|
)
|
|
|
|
job_id = resp["jobId"]
|
|
|
|
|
|
|
|
batch_client.cancel_job(jobId=job_id, reason="test_cancel")
|
|
|
|
_wait_for_job_status(batch_client, job_id, "FAILED", seconds_to_wait=10)
|
|
|
|
|
|
|
|
resp = batch_client.describe_jobs(jobs=[job_id])
|
|
|
|
resp["jobs"][0]["jobName"].should.equal("test_job_name")
|
|
|
|
resp["jobs"][0]["statusReason"].should.equal("test_cancel")
|
|
|
|
|
|
|
|
|
|
|
|
@mock_logs
|
|
|
|
@mock_ec2
|
|
|
|
@mock_ecs
|
|
|
|
@mock_iam
|
|
|
|
@mock_batch
|
|
|
|
def test_cancel_running_job():
|
|
|
|
"""
|
|
|
|
Test verifies that the moment the job has started, we can't cancel anymore
|
|
|
|
"""
|
|
|
|
ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients()
|
|
|
|
vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client)
|
|
|
|
|
|
|
|
job_def_name = "echo-o-o"
|
|
|
|
commands = ["echo", "start"]
|
|
|
|
job_def_arn, queue_arn = prepare_job(batch_client, commands, iam_arn, job_def_name)
|
|
|
|
|
|
|
|
resp = batch_client.submit_job(
|
|
|
|
jobName="test_job_name", jobQueue=queue_arn, jobDefinition=job_def_arn
|
|
|
|
)
|
|
|
|
job_id = resp["jobId"]
|
|
|
|
_wait_for_job_status(batch_client, job_id, "STARTING")
|
|
|
|
|
|
|
|
batch_client.cancel_job(jobId=job_id, reason="test_cancel")
|
|
|
|
# We cancelled too late, the job was already running. Now we just wait for it to succeed
|
|
|
|
_wait_for_job_status(batch_client, job_id, "SUCCEEDED", seconds_to_wait=5)
|
|
|
|
|
|
|
|
resp = batch_client.describe_jobs(jobs=[job_id])
|
|
|
|
resp["jobs"][0]["jobName"].should.equal("test_job_name")
|
|
|
|
resp["jobs"][0].shouldnt.have.key("statusReason")
|
|
|
|
|
|
|
|
|
Fix Race Condition in batch:SubmitJob (#3480)
* Extract Duplicate Code into Helper Method
DRY up the tests and replace the arbitrary `sleep()` calls with a more
explicit check before progressing.
* Improve Testing of batch:TerminateJob
The test now confirms that the job was terminated by sandwiching a `sleep`
command between two `echo` commands. In addition to the original checks
of the terminated job status/reason, the test now asserts that only the
first echo command succeeded, confirming that the job was indeed terminated
while in progress.
* Fix Race Condition in batch:SubmitJob
The `test_submit_job` in `test_batch.py` kicks off a job, calls `describe_jobs`
in a loop until the job status returned is SUCCEEDED, and then asserts against
the logged events.
The backend code that runs the submitted job does so in a separate thread. If
the job was successful, the job status was being set to SUCCEEDED *before* the
event logs had been written to the logging backend.
As a result, it was possible for the primary thread running the test to detect
that the job was successful immediately after the secondary thread had updated
the job status but before the secondary thread had written the logs to the
logging backend. Under the right conditions, this could cause the subsequent
logging assertions in the primary thread to fail.
Additionally, the code that collected the logs from the container was using
a "dodgy hack" of time.sleep() and a modulo-based conditional that was
ultimately non-deterministic and could result in log messages being dropped
or duplicated in certain scenarios.
In order to address these issues, this commit does the following:
* Carefully re-orders any code that sets a job status or timestamp
to avoid any obvious race conditions.
* Removes the "dodgy hack" in favor of a much more straightforward
(and less error-prone) method of collecting logs from the container.
* Removes arbitrary and unnecessary calls to time.sleep()
Before applying any changes, the flaky test was failing about 12% of the
time. Putting a sleep() call between setting the `job_status` to SUCCEEDED
and collecting the logs, resulted in a 100% failure rate. Simply moving
the code that sets the job status to SUCCEEDED to the end of the code block,
dropped the failure rate to ~2%. Finally, removing the log collection
hack allowed the test suite to run ~1000 times without a single failure.
Taken in aggregate, these changes make the batch backend more deterministic
and should put the nail in the coffin of this flaky test.
Closes #3475
2020-11-18 10:49:25 +00:00
|
|
|
def _wait_for_job_status(client, job_id, status, seconds_to_wait=30):
|
|
|
|
wait_time = datetime.datetime.now() + datetime.timedelta(seconds=seconds_to_wait)
|
|
|
|
last_job_status = None
|
|
|
|
while datetime.datetime.now() < wait_time:
|
|
|
|
resp = client.describe_jobs(jobs=[job_id])
|
|
|
|
last_job_status = resp["jobs"][0]["status"]
|
|
|
|
if last_job_status == status:
|
|
|
|
break
|
|
|
|
else:
|
|
|
|
raise RuntimeError(
|
|
|
|
"Time out waiting for job status {status}!\n Last status: {last_status}".format(
|
|
|
|
status=status, last_status=last_job_status
|
|
|
|
)
|
|
|
|
)
|
2021-05-26 07:52:09 +00:00
|
|
|
|
|
|
|
|
|
|
|
@mock_logs
|
|
|
|
@mock_ec2
|
|
|
|
@mock_ecs
|
|
|
|
@mock_iam
|
|
|
|
@mock_batch
|
|
|
|
def test_failed_job():
|
|
|
|
ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients()
|
|
|
|
vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client)
|
|
|
|
|
2021-08-22 11:29:23 +00:00
|
|
|
job_def_name = "exit-1"
|
|
|
|
commands = ["exit", "1"]
|
|
|
|
job_def_arn, queue_arn = prepare_job(batch_client, commands, iam_arn, job_def_name)
|
2021-05-26 07:52:09 +00:00
|
|
|
|
|
|
|
resp = batch_client.submit_job(
|
|
|
|
jobName="test1", jobQueue=queue_arn, jobDefinition=job_def_arn
|
|
|
|
)
|
|
|
|
job_id = resp["jobId"]
|
|
|
|
|
|
|
|
future = datetime.datetime.now() + datetime.timedelta(seconds=30)
|
|
|
|
|
|
|
|
while datetime.datetime.now() < future:
|
|
|
|
resp = batch_client.describe_jobs(jobs=[job_id])
|
|
|
|
|
|
|
|
if resp["jobs"][0]["status"] == "FAILED":
|
|
|
|
break
|
|
|
|
if resp["jobs"][0]["status"] == "SUCCEEDED":
|
|
|
|
raise RuntimeError("Batch job succeeded even though it had exit code 1")
|
|
|
|
time.sleep(0.5)
|
|
|
|
else:
|
|
|
|
raise RuntimeError("Batch job timed out")
|
|
|
|
|
|
|
|
|
|
|
|
@mock_logs
|
|
|
|
@mock_ec2
|
|
|
|
@mock_ecs
|
|
|
|
@mock_iam
|
|
|
|
@mock_batch
|
|
|
|
def test_dependencies():
|
|
|
|
ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients()
|
|
|
|
vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client)
|
|
|
|
|
2021-08-22 11:29:23 +00:00
|
|
|
job_def_arn, queue_arn = prepare_job(
|
|
|
|
batch_client,
|
|
|
|
commands=["echo", "hello"],
|
|
|
|
iam_arn=iam_arn,
|
|
|
|
job_def_name="dependencytest",
|
2021-05-26 07:52:09 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
resp = batch_client.submit_job(
|
|
|
|
jobName="test1", jobQueue=queue_arn, jobDefinition=job_def_arn
|
|
|
|
)
|
|
|
|
job_id1 = resp["jobId"]
|
|
|
|
|
|
|
|
resp = batch_client.submit_job(
|
|
|
|
jobName="test2", jobQueue=queue_arn, jobDefinition=job_def_arn
|
|
|
|
)
|
|
|
|
job_id2 = resp["jobId"]
|
|
|
|
|
|
|
|
depends_on = [
|
|
|
|
{"jobId": job_id1, "type": "SEQUENTIAL"},
|
|
|
|
{"jobId": job_id2, "type": "SEQUENTIAL"},
|
|
|
|
]
|
|
|
|
resp = batch_client.submit_job(
|
|
|
|
jobName="test3",
|
|
|
|
jobQueue=queue_arn,
|
|
|
|
jobDefinition=job_def_arn,
|
|
|
|
dependsOn=depends_on,
|
|
|
|
)
|
|
|
|
job_id3 = resp["jobId"]
|
|
|
|
|
|
|
|
future = datetime.datetime.now() + datetime.timedelta(seconds=30)
|
|
|
|
|
|
|
|
while datetime.datetime.now() < future:
|
|
|
|
resp = batch_client.describe_jobs(jobs=[job_id1, job_id2, job_id3])
|
|
|
|
|
|
|
|
if any([job["status"] == "FAILED" for job in resp["jobs"]]):
|
|
|
|
raise RuntimeError("Batch job failed")
|
|
|
|
if all([job["status"] == "SUCCEEDED" for job in resp["jobs"]]):
|
|
|
|
break
|
|
|
|
time.sleep(0.5)
|
|
|
|
else:
|
|
|
|
raise RuntimeError("Batch job timed out")
|
|
|
|
|
2021-10-05 17:11:07 +00:00
|
|
|
log_stream_name = "/aws/batch/job"
|
|
|
|
all_streams = retrieve_all_streams(log_stream_name, logs_client)
|
|
|
|
|
|
|
|
nr_logstreams_found = 0
|
|
|
|
expected_logstream_names = [
|
|
|
|
f"dependencytest/default/{_id}" for _id in [job_id1, job_id2, job_id3]
|
|
|
|
]
|
|
|
|
for log_stream in all_streams:
|
2021-05-26 07:52:09 +00:00
|
|
|
ls_name = log_stream["logStreamName"]
|
|
|
|
|
2021-10-05 17:11:07 +00:00
|
|
|
if ls_name not in expected_logstream_names:
|
|
|
|
continue
|
|
|
|
|
2021-05-26 07:52:09 +00:00
|
|
|
resp = logs_client.get_log_events(
|
2021-10-05 17:11:07 +00:00
|
|
|
logGroupName=log_stream_name, logStreamName=ls_name
|
2021-05-26 07:52:09 +00:00
|
|
|
)
|
|
|
|
[event["message"] for event in resp["events"]].should.equal(["hello"])
|
|
|
|
|
2021-10-05 17:11:07 +00:00
|
|
|
nr_logstreams_found = nr_logstreams_found + 1
|
|
|
|
nr_logstreams_found.should.equal(3)
|
|
|
|
|
|
|
|
|
|
|
|
def retrieve_all_streams(log_stream_name, logs_client):
|
|
|
|
resp = logs_client.describe_log_streams(logGroupName=log_stream_name)
|
|
|
|
all_streams = resp["logStreams"]
|
|
|
|
token = resp.get("nextToken")
|
|
|
|
while token:
|
|
|
|
resp = logs_client.describe_log_streams(
|
|
|
|
logGroupName=log_stream_name, nextToken=token
|
|
|
|
)
|
|
|
|
all_streams.extend(resp["logStreams"])
|
|
|
|
token = resp.get("nextToken")
|
|
|
|
return all_streams
|
|
|
|
|
2021-05-26 07:52:09 +00:00
|
|
|
|
|
|
|
@mock_logs
|
|
|
|
@mock_ec2
|
|
|
|
@mock_ecs
|
|
|
|
@mock_iam
|
|
|
|
@mock_batch
|
|
|
|
def test_failed_dependencies():
|
|
|
|
ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients()
|
|
|
|
vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client)
|
|
|
|
|
2021-10-05 17:11:07 +00:00
|
|
|
compute_name = str(uuid4())[0:6]
|
2021-05-26 07:52:09 +00:00
|
|
|
resp = batch_client.create_compute_environment(
|
|
|
|
computeEnvironmentName=compute_name,
|
|
|
|
type="UNMANAGED",
|
|
|
|
state="ENABLED",
|
|
|
|
serviceRole=iam_arn,
|
|
|
|
)
|
|
|
|
arn = resp["computeEnvironmentArn"]
|
|
|
|
|
|
|
|
resp = batch_client.create_job_queue(
|
2021-10-05 17:11:07 +00:00
|
|
|
jobQueueName=str(uuid4())[0:6],
|
2021-05-26 07:52:09 +00:00
|
|
|
state="ENABLED",
|
|
|
|
priority=123,
|
|
|
|
computeEnvironmentOrder=[{"order": 123, "computeEnvironment": arn}],
|
|
|
|
)
|
|
|
|
queue_arn = resp["jobQueueArn"]
|
|
|
|
|
|
|
|
resp = batch_client.register_job_definition(
|
|
|
|
jobDefinitionName="sayhellotomylittlefriend",
|
|
|
|
type="container",
|
|
|
|
containerProperties={
|
|
|
|
"image": "busybox:latest",
|
|
|
|
"vcpus": 1,
|
|
|
|
"memory": 128,
|
|
|
|
"command": ["echo", "hello"],
|
|
|
|
},
|
|
|
|
)
|
|
|
|
job_def_arn_success = resp["jobDefinitionArn"]
|
|
|
|
|
|
|
|
resp = batch_client.register_job_definition(
|
|
|
|
jobDefinitionName="sayhellotomylittlefriend_failed",
|
|
|
|
type="container",
|
|
|
|
containerProperties={
|
|
|
|
"image": "busybox:latest",
|
|
|
|
"vcpus": 1,
|
|
|
|
"memory": 128,
|
|
|
|
"command": ["exi1", "1"],
|
|
|
|
},
|
|
|
|
)
|
|
|
|
job_def_arn_failure = resp["jobDefinitionArn"]
|
|
|
|
|
|
|
|
resp = batch_client.submit_job(
|
|
|
|
jobName="test1", jobQueue=queue_arn, jobDefinition=job_def_arn_success
|
|
|
|
)
|
|
|
|
|
|
|
|
job_id1 = resp["jobId"]
|
|
|
|
|
|
|
|
resp = batch_client.submit_job(
|
|
|
|
jobName="test2", jobQueue=queue_arn, jobDefinition=job_def_arn_failure
|
|
|
|
)
|
|
|
|
job_id2 = resp["jobId"]
|
|
|
|
|
|
|
|
depends_on = [
|
|
|
|
{"jobId": job_id1, "type": "SEQUENTIAL"},
|
|
|
|
{"jobId": job_id2, "type": "SEQUENTIAL"},
|
|
|
|
]
|
|
|
|
resp = batch_client.submit_job(
|
|
|
|
jobName="test3",
|
|
|
|
jobQueue=queue_arn,
|
|
|
|
jobDefinition=job_def_arn_success,
|
|
|
|
dependsOn=depends_on,
|
|
|
|
)
|
|
|
|
job_id3 = resp["jobId"]
|
|
|
|
|
|
|
|
future = datetime.datetime.now() + datetime.timedelta(seconds=30)
|
|
|
|
|
|
|
|
# Query batch jobs until all jobs have run.
|
|
|
|
# Job 2 is supposed to fail and in consequence Job 3 should never run
|
|
|
|
# and status should change directly from PENDING to FAILED
|
|
|
|
while datetime.datetime.now() < future:
|
|
|
|
resp = batch_client.describe_jobs(jobs=[job_id2, job_id3])
|
|
|
|
|
|
|
|
assert resp["jobs"][0]["status"] != "SUCCEEDED", "Job 2 cannot succeed"
|
|
|
|
assert resp["jobs"][1]["status"] != "SUCCEEDED", "Job 3 cannot succeed"
|
|
|
|
|
|
|
|
if resp["jobs"][1]["status"] == "FAILED":
|
|
|
|
break
|
|
|
|
|
|
|
|
time.sleep(0.5)
|
|
|
|
else:
|
|
|
|
raise RuntimeError("Batch job timed out")
|
|
|
|
|
|
|
|
|
|
|
|
@mock_logs
|
|
|
|
@mock_ec2
|
|
|
|
@mock_ecs
|
|
|
|
@mock_iam
|
|
|
|
@mock_batch
|
|
|
|
def test_container_overrides():
|
|
|
|
"""
|
|
|
|
Test if container overrides have any effect.
|
|
|
|
Overwrites should be reflected in container description.
|
|
|
|
Environment variables should be accessible inside docker container
|
|
|
|
"""
|
|
|
|
|
|
|
|
# Set up environment
|
|
|
|
|
|
|
|
ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients()
|
|
|
|
vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client)
|
|
|
|
|
2021-10-05 17:11:07 +00:00
|
|
|
compute_name = str(uuid4())[0:6]
|
2021-05-26 07:52:09 +00:00
|
|
|
resp = batch_client.create_compute_environment(
|
|
|
|
computeEnvironmentName=compute_name,
|
|
|
|
type="UNMANAGED",
|
|
|
|
state="ENABLED",
|
|
|
|
serviceRole=iam_arn,
|
|
|
|
)
|
|
|
|
arn = resp["computeEnvironmentArn"]
|
|
|
|
|
|
|
|
resp = batch_client.create_job_queue(
|
2021-10-05 17:11:07 +00:00
|
|
|
jobQueueName=str(uuid4())[0:6],
|
2021-05-26 07:52:09 +00:00
|
|
|
state="ENABLED",
|
|
|
|
priority=123,
|
|
|
|
computeEnvironmentOrder=[{"order": 123, "computeEnvironment": arn}],
|
|
|
|
)
|
|
|
|
queue_arn = resp["jobQueueArn"]
|
|
|
|
|
2021-10-05 17:11:07 +00:00
|
|
|
job_definition_name = f"sleep10_{str(uuid4())[0:6]}"
|
2021-05-26 07:52:09 +00:00
|
|
|
|
|
|
|
# Set up Job Definition
|
|
|
|
# We will then override the container properties in the actual job
|
|
|
|
resp = batch_client.register_job_definition(
|
|
|
|
jobDefinitionName=job_definition_name,
|
|
|
|
type="container",
|
|
|
|
containerProperties={
|
|
|
|
"image": "busybox",
|
|
|
|
"vcpus": 1,
|
|
|
|
"memory": 512,
|
|
|
|
"command": ["sleep", "10"],
|
|
|
|
"environment": [
|
|
|
|
{"name": "TEST0", "value": "from job definition"},
|
|
|
|
{"name": "TEST1", "value": "from job definition"},
|
|
|
|
],
|
|
|
|
},
|
|
|
|
)
|
|
|
|
|
|
|
|
job_definition_arn = resp["jobDefinitionArn"]
|
|
|
|
|
|
|
|
# The Job to run, including container overrides
|
|
|
|
resp = batch_client.submit_job(
|
|
|
|
jobName="test1",
|
|
|
|
jobQueue=queue_arn,
|
|
|
|
jobDefinition=job_definition_name,
|
|
|
|
containerOverrides={
|
|
|
|
"vcpus": 2,
|
|
|
|
"memory": 1024,
|
|
|
|
"command": ["printenv"],
|
|
|
|
"environment": [
|
|
|
|
{"name": "TEST0", "value": "from job"},
|
|
|
|
{"name": "TEST2", "value": "from job"},
|
|
|
|
],
|
|
|
|
},
|
|
|
|
)
|
|
|
|
|
|
|
|
job_id = resp["jobId"]
|
|
|
|
|
|
|
|
# Wait until Job finishes
|
|
|
|
future = datetime.datetime.now() + datetime.timedelta(seconds=30)
|
|
|
|
|
|
|
|
while datetime.datetime.now() < future:
|
|
|
|
resp_jobs = batch_client.describe_jobs(jobs=[job_id])
|
|
|
|
|
|
|
|
if resp_jobs["jobs"][0]["status"] == "FAILED":
|
|
|
|
raise RuntimeError("Batch job failed")
|
|
|
|
if resp_jobs["jobs"][0]["status"] == "SUCCEEDED":
|
|
|
|
break
|
|
|
|
time.sleep(0.5)
|
|
|
|
else:
|
|
|
|
raise RuntimeError("Batch job timed out")
|
|
|
|
|
|
|
|
# Getting the log stream to read out env variables inside container
|
|
|
|
resp = logs_client.describe_log_streams(logGroupName="/aws/batch/job")
|
|
|
|
|
|
|
|
env_var = list()
|
|
|
|
for stream in resp["logStreams"]:
|
|
|
|
ls_name = stream["logStreamName"]
|
|
|
|
|
|
|
|
stream_resp = logs_client.get_log_events(
|
|
|
|
logGroupName="/aws/batch/job", logStreamName=ls_name
|
|
|
|
)
|
|
|
|
|
|
|
|
for event in stream_resp["events"]:
|
|
|
|
if "TEST" in event["message"] or "AWS" in event["message"]:
|
|
|
|
key, value = tuple(event["message"].split("="))
|
|
|
|
env_var.append({"name": key, "value": value})
|
|
|
|
|
|
|
|
len(resp_jobs["jobs"]).should.equal(1)
|
|
|
|
resp_jobs["jobs"][0]["jobId"].should.equal(job_id)
|
|
|
|
resp_jobs["jobs"][0]["jobQueue"].should.equal(queue_arn)
|
|
|
|
resp_jobs["jobs"][0]["jobDefinition"].should.equal(job_definition_arn)
|
|
|
|
resp_jobs["jobs"][0]["container"]["vcpus"].should.equal(2)
|
|
|
|
resp_jobs["jobs"][0]["container"]["memory"].should.equal(1024)
|
|
|
|
resp_jobs["jobs"][0]["container"]["command"].should.equal(["printenv"])
|
|
|
|
|
|
|
|
sure.expect(resp_jobs["jobs"][0]["container"]["environment"]).to.contain(
|
|
|
|
{"name": "TEST0", "value": "from job"}
|
|
|
|
)
|
|
|
|
sure.expect(resp_jobs["jobs"][0]["container"]["environment"]).to.contain(
|
|
|
|
{"name": "TEST1", "value": "from job definition"}
|
|
|
|
)
|
|
|
|
sure.expect(resp_jobs["jobs"][0]["container"]["environment"]).to.contain(
|
|
|
|
{"name": "TEST2", "value": "from job"}
|
|
|
|
)
|
|
|
|
sure.expect(resp_jobs["jobs"][0]["container"]["environment"]).to.contain(
|
|
|
|
{"name": "AWS_BATCH_JOB_ID", "value": job_id}
|
|
|
|
)
|
|
|
|
|
|
|
|
sure.expect(env_var).to.contain({"name": "TEST0", "value": "from job"})
|
|
|
|
sure.expect(env_var).to.contain({"name": "TEST1", "value": "from job definition"})
|
|
|
|
sure.expect(env_var).to.contain({"name": "TEST2", "value": "from job"})
|
|
|
|
|
|
|
|
sure.expect(env_var).to.contain({"name": "AWS_BATCH_JOB_ID", "value": job_id})
|
2021-08-22 11:29:23 +00:00
|
|
|
|
|
|
|
|
|
|
|
def prepare_job(batch_client, commands, iam_arn, job_def_name):
|
2021-10-05 17:11:07 +00:00
|
|
|
compute_name = str(uuid4())[0:6]
|
2021-08-22 11:29:23 +00:00
|
|
|
resp = batch_client.create_compute_environment(
|
|
|
|
computeEnvironmentName=compute_name,
|
|
|
|
type="UNMANAGED",
|
|
|
|
state="ENABLED",
|
|
|
|
serviceRole=iam_arn,
|
|
|
|
)
|
|
|
|
arn = resp["computeEnvironmentArn"]
|
|
|
|
|
|
|
|
resp = batch_client.create_job_queue(
|
2021-10-05 17:11:07 +00:00
|
|
|
jobQueueName=str(uuid4())[0:6],
|
2021-08-22 11:29:23 +00:00
|
|
|
state="ENABLED",
|
|
|
|
priority=123,
|
|
|
|
computeEnvironmentOrder=[{"order": 123, "computeEnvironment": arn}],
|
|
|
|
)
|
|
|
|
queue_arn = resp["jobQueueArn"]
|
|
|
|
resp = batch_client.register_job_definition(
|
|
|
|
jobDefinitionName=job_def_name,
|
|
|
|
type="container",
|
|
|
|
containerProperties={
|
|
|
|
"image": "busybox:latest",
|
|
|
|
"vcpus": 1,
|
|
|
|
"memory": 128,
|
|
|
|
"command": commands,
|
|
|
|
},
|
|
|
|
)
|
|
|
|
job_def_arn = resp["jobDefinitionArn"]
|
|
|
|
return job_def_arn, queue_arn
|