Merge pull request #2808 from bblommers/feature/2201

Batch - various fixes
This commit is contained in:
Steve Pulec 2020-03-15 16:45:57 -05:00 committed by GitHub
commit 6a887e52cc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 18 additions and 32 deletions

View File

@ -301,7 +301,7 @@ class Job(threading.Thread, BaseModel):
self.job_name = name self.job_name = name
self.job_id = str(uuid.uuid4()) self.job_id = str(uuid.uuid4())
self.job_definition = job_def self.job_definition = job_def
self.container_overrides = container_overrides self.container_overrides = container_overrides or {}
self.job_queue = job_queue self.job_queue = job_queue
self.job_state = "SUBMITTED" # One of SUBMITTED | PENDING | RUNNABLE | STARTING | RUNNING | SUCCEEDED | FAILED self.job_state = "SUBMITTED" # One of SUBMITTED | PENDING | RUNNABLE | STARTING | RUNNING | SUCCEEDED | FAILED
self.job_queue.jobs.append(self) self.job_queue.jobs.append(self)
@ -317,6 +317,7 @@ class Job(threading.Thread, BaseModel):
self.docker_client = docker.from_env() self.docker_client = docker.from_env()
self._log_backend = log_backend self._log_backend = log_backend
self.log_stream_name = None
# Unfortunately mocking replaces this method w/o fallback enabled, so we # Unfortunately mocking replaces this method w/o fallback enabled, so we
# need to replace it if we detect it's been mocked # need to replace it if we detect it's been mocked
@ -338,10 +339,11 @@ class Job(threading.Thread, BaseModel):
"jobId": self.job_id, "jobId": self.job_id,
"jobName": self.job_name, "jobName": self.job_name,
"jobQueue": self.job_queue.arn, "jobQueue": self.job_queue.arn,
"startedAt": datetime2int(self.job_started_at),
"status": self.job_state, "status": self.job_state,
"dependsOn": [], "dependsOn": [],
} }
if result["status"] not in ["SUBMITTED", "PENDING", "RUNNABLE", "STARTING"]:
result["startedAt"] = datetime2int(self.job_started_at)
if self.job_stopped: if self.job_stopped:
result["stoppedAt"] = datetime2int(self.job_stopped_at) result["stoppedAt"] = datetime2int(self.job_stopped_at)
result["container"] = {} result["container"] = {}
@ -503,7 +505,10 @@ class Job(threading.Thread, BaseModel):
for line in logs_stdout + logs_stderr: for line in logs_stdout + logs_stderr:
date, line = line.split(" ", 1) date, line = line.split(" ", 1)
date = dateutil.parser.parse(date) date = dateutil.parser.parse(date)
date = int(date.timestamp()) # TODO: Replace with int(date.timestamp()) once we yeet Python2 out of the window
date = int(
(time.mktime(date.timetuple()) + date.microsecond / 1000000.0)
)
logs.append({"timestamp": date, "message": line.strip()}) logs.append({"timestamp": date, "message": line.strip()})
# Send to cloudwatch # Send to cloudwatch

View File

@ -10,17 +10,6 @@ import functools
import nose import nose
def expected_failure(test):
@functools.wraps(test)
def inner(*args, **kwargs):
try:
test(*args, **kwargs)
except Exception as err:
raise nose.SkipTest
return inner
DEFAULT_REGION = "eu-central-1" DEFAULT_REGION = "eu-central-1"
@ -692,7 +681,8 @@ def test_submit_job_by_name():
# SLOW TESTS # SLOW TESTS
@expected_failure
@mock_logs @mock_logs
@mock_ec2 @mock_ec2
@mock_ecs @mock_ecs
@ -720,13 +710,13 @@ def test_submit_job():
queue_arn = resp["jobQueueArn"] queue_arn = resp["jobQueueArn"]
resp = batch_client.register_job_definition( resp = batch_client.register_job_definition(
jobDefinitionName="sleep10", jobDefinitionName="sayhellotomylittlefriend",
type="container", type="container",
containerProperties={ containerProperties={
"image": "busybox", "image": "busybox:latest",
"vcpus": 1, "vcpus": 1,
"memory": 128, "memory": 128,
"command": ["sleep", "10"], "command": ["echo", "hello"],
}, },
) )
job_def_arn = resp["jobDefinitionArn"] job_def_arn = resp["jobDefinitionArn"]
@ -740,13 +730,6 @@ def test_submit_job():
while datetime.datetime.now() < future: while datetime.datetime.now() < future:
resp = batch_client.describe_jobs(jobs=[job_id]) resp = batch_client.describe_jobs(jobs=[job_id])
print(
"{0}:{1} {2}".format(
resp["jobs"][0]["jobName"],
resp["jobs"][0]["jobId"],
resp["jobs"][0]["status"],
)
)
if resp["jobs"][0]["status"] == "FAILED": if resp["jobs"][0]["status"] == "FAILED":
raise RuntimeError("Batch job failed") raise RuntimeError("Batch job failed")
@ -763,10 +746,9 @@ def test_submit_job():
resp = logs_client.get_log_events( resp = logs_client.get_log_events(
logGroupName="/aws/batch/job", logStreamName=ls_name logGroupName="/aws/batch/job", logStreamName=ls_name
) )
len(resp["events"]).should.be.greater_than(5) [event["message"] for event in resp["events"]].should.equal(["hello"])
@expected_failure
@mock_logs @mock_logs
@mock_ec2 @mock_ec2
@mock_ecs @mock_ecs
@ -794,13 +776,13 @@ def test_list_jobs():
queue_arn = resp["jobQueueArn"] queue_arn = resp["jobQueueArn"]
resp = batch_client.register_job_definition( resp = batch_client.register_job_definition(
jobDefinitionName="sleep10", jobDefinitionName="sleep5",
type="container", type="container",
containerProperties={ containerProperties={
"image": "busybox", "image": "busybox:latest",
"vcpus": 1, "vcpus": 1,
"memory": 128, "memory": 128,
"command": ["sleep", "10"], "command": ["sleep", "5"],
}, },
) )
job_def_arn = resp["jobDefinitionArn"] job_def_arn = resp["jobDefinitionArn"]
@ -843,7 +825,6 @@ def test_list_jobs():
len(resp_finished_jobs2["jobSummaryList"]).should.equal(2) len(resp_finished_jobs2["jobSummaryList"]).should.equal(2)
@expected_failure
@mock_logs @mock_logs
@mock_ec2 @mock_ec2
@mock_ecs @mock_ecs
@ -874,7 +855,7 @@ def test_terminate_job():
jobDefinitionName="sleep10", jobDefinitionName="sleep10",
type="container", type="container",
containerProperties={ containerProperties={
"image": "busybox", "image": "busybox:latest",
"vcpus": 1, "vcpus": 1,
"memory": 128, "memory": 128,
"command": ["sleep", "10"], "command": ["sleep", "10"],