Increase Batch timeouts, in case Docker takes a while to start (#4827)
This commit is contained in:
parent
38bb2d5d1b
commit
e1ffd27201
@ -720,6 +720,10 @@ class Job(threading.Thread, BaseModel, DockerModel):
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
if self.stop:
|
||||||
|
# This job has been cancelled while it was waiting for a dependency
|
||||||
|
self._mark_stopped(success=False)
|
||||||
|
return False
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
@ -1,6 +1,9 @@
|
|||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
|
|
||||||
|
from functools import lru_cache
|
||||||
|
|
||||||
|
|
||||||
TEST_SERVER_MODE = os.environ.get("TEST_SERVER_MODE", "0").lower() == "true"
|
TEST_SERVER_MODE = os.environ.get("TEST_SERVER_MODE", "0").lower() == "true"
|
||||||
INITIAL_NO_AUTH_ACTION_COUNT = float(
|
INITIAL_NO_AUTH_ACTION_COUNT = float(
|
||||||
os.environ.get("INITIAL_NO_AUTH_ACTION_COUNT", float("inf"))
|
os.environ.get("INITIAL_NO_AUTH_ACTION_COUNT", float("inf"))
|
||||||
@ -55,6 +58,7 @@ def moto_server_port():
|
|||||||
return os.environ.get("MOTO_PORT") or "5000"
|
return os.environ.get("MOTO_PORT") or "5000"
|
||||||
|
|
||||||
|
|
||||||
|
@lru_cache()
|
||||||
def moto_server_host():
|
def moto_server_host():
|
||||||
if is_docker():
|
if is_docker():
|
||||||
return get_docker_host()
|
return get_docker_host()
|
||||||
|
@ -207,11 +207,11 @@ def test_terminate_job():
|
|||||||
)
|
)
|
||||||
job_id = resp["jobId"]
|
job_id = resp["jobId"]
|
||||||
|
|
||||||
_wait_for_job_status(batch_client, job_id, "RUNNING")
|
_wait_for_job_status(batch_client, job_id, "RUNNING", seconds_to_wait=120)
|
||||||
|
|
||||||
batch_client.terminate_job(jobId=job_id, reason="test_terminate")
|
batch_client.terminate_job(jobId=job_id, reason="test_terminate")
|
||||||
|
|
||||||
_wait_for_job_status(batch_client, job_id, "FAILED")
|
_wait_for_job_status(batch_client, job_id, "FAILED", seconds_to_wait=120)
|
||||||
|
|
||||||
resp = batch_client.describe_jobs(jobs=[job_id])
|
resp = batch_client.describe_jobs(jobs=[job_id])
|
||||||
resp["jobs"][0]["jobName"].should.equal("test1")
|
resp["jobs"][0]["jobName"].should.equal("test1")
|
||||||
@ -241,7 +241,7 @@ def test_cancel_pending_job():
|
|||||||
# We need to be able to cancel a job that has not been started yet
|
# We need to be able to cancel a job that has not been started yet
|
||||||
# Locally, our jobs start so fast that we can't cancel them in time
|
# Locally, our jobs start so fast that we can't cancel them in time
|
||||||
# So delay our job, by letting it depend on a slow-running job
|
# So delay our job, by letting it depend on a slow-running job
|
||||||
commands = ["sleep", "1"]
|
commands = ["sleep", "10"]
|
||||||
job_def_arn, queue_arn = prepare_job(batch_client, commands, iam_arn, "deptest")
|
job_def_arn, queue_arn = prepare_job(batch_client, commands, iam_arn, "deptest")
|
||||||
|
|
||||||
resp = batch_client.submit_job(
|
resp = batch_client.submit_job(
|
||||||
@ -290,14 +290,14 @@ def test_cancel_running_job():
|
|||||||
|
|
||||||
batch_client.cancel_job(jobId=job_id, reason="test_cancel")
|
batch_client.cancel_job(jobId=job_id, reason="test_cancel")
|
||||||
# We cancelled too late, the job was already running. Now we just wait for it to succeed
|
# We cancelled too late, the job was already running. Now we just wait for it to succeed
|
||||||
_wait_for_job_status(batch_client, job_id, "SUCCEEDED", seconds_to_wait=5)
|
_wait_for_job_status(batch_client, job_id, "SUCCEEDED")
|
||||||
|
|
||||||
resp = batch_client.describe_jobs(jobs=[job_id])
|
resp = batch_client.describe_jobs(jobs=[job_id])
|
||||||
resp["jobs"][0]["jobName"].should.equal("test_job_name")
|
resp["jobs"][0]["jobName"].should.equal("test_job_name")
|
||||||
resp["jobs"][0].shouldnt.have.key("statusReason")
|
resp["jobs"][0].shouldnt.have.key("statusReason")
|
||||||
|
|
||||||
|
|
||||||
def _wait_for_job_status(client, job_id, status, seconds_to_wait=30):
|
def _wait_for_job_status(client, job_id, status, seconds_to_wait=60):
|
||||||
wait_time = datetime.datetime.now() + datetime.timedelta(seconds=seconds_to_wait)
|
wait_time = datetime.datetime.now() + datetime.timedelta(seconds=seconds_to_wait)
|
||||||
last_job_status = None
|
last_job_status = None
|
||||||
while datetime.datetime.now() < wait_time:
|
while datetime.datetime.now() < wait_time:
|
||||||
@ -305,6 +305,7 @@ def _wait_for_job_status(client, job_id, status, seconds_to_wait=30):
|
|||||||
last_job_status = resp["jobs"][0]["status"]
|
last_job_status = resp["jobs"][0]["status"]
|
||||||
if last_job_status == status:
|
if last_job_status == status:
|
||||||
break
|
break
|
||||||
|
time.sleep(0.1)
|
||||||
else:
|
else:
|
||||||
raise RuntimeError(
|
raise RuntimeError(
|
||||||
"Time out waiting for job status {status}!\n Last status: {last_status}".format(
|
"Time out waiting for job status {status}!\n Last status: {last_status}".format(
|
||||||
@ -476,7 +477,7 @@ def test_failed_dependencies():
|
|||||||
"image": "busybox:latest",
|
"image": "busybox:latest",
|
||||||
"vcpus": 1,
|
"vcpus": 1,
|
||||||
"memory": 128,
|
"memory": 128,
|
||||||
"command": ["exi1", "1"],
|
"command": ["exit", "1"],
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
job_def_arn_failure = resp["jobDefinitionArn"]
|
job_def_arn_failure = resp["jobDefinitionArn"]
|
||||||
@ -763,7 +764,7 @@ def test_submit_job_with_timeout():
|
|||||||
_, _, _, iam_arn = _setup(ec2_client, iam_client)
|
_, _, _, iam_arn = _setup(ec2_client, iam_client)
|
||||||
|
|
||||||
job_def_name = str(uuid4())[0:6]
|
job_def_name = str(uuid4())[0:6]
|
||||||
commands = ["sleep", "3"]
|
commands = ["sleep", "30"]
|
||||||
job_def_arn, queue_arn = prepare_job(batch_client, commands, iam_arn, job_def_name)
|
job_def_arn, queue_arn = prepare_job(batch_client, commands, iam_arn, job_def_name)
|
||||||
|
|
||||||
resp = batch_client.submit_job(
|
resp = batch_client.submit_job(
|
||||||
@ -786,7 +787,7 @@ def test_submit_job_with_timeout_set_at_definition():
|
|||||||
_, _, _, iam_arn = _setup(ec2_client, iam_client)
|
_, _, _, iam_arn = _setup(ec2_client, iam_client)
|
||||||
|
|
||||||
job_def_name = str(uuid4())[0:6]
|
job_def_name = str(uuid4())[0:6]
|
||||||
commands = ["sleep", "3"]
|
commands = ["sleep", "30"]
|
||||||
_, queue_arn = prepare_job(batch_client, commands, iam_arn, job_def_name)
|
_, queue_arn = prepare_job(batch_client, commands, iam_arn, job_def_name)
|
||||||
resp = batch_client.register_job_definition(
|
resp = batch_client.register_job_definition(
|
||||||
jobDefinitionName=job_def_name,
|
jobDefinitionName=job_def_name,
|
||||||
|
@ -55,7 +55,7 @@ def test_describe_stack_subresources():
|
|||||||
template_body = simple_queue.substitute(q_name=q_name)
|
template_body = simple_queue.substitute(q_name=q_name)
|
||||||
cf.create_stack(StackName=stack_name, TemplateBody=template_body)
|
cf.create_stack(StackName=stack_name, TemplateBody=template_body)
|
||||||
|
|
||||||
queue_urls = client.list_queues()["QueueUrls"]
|
queue_urls = client.list_queues(QueueNamePrefix=q_name)["QueueUrls"]
|
||||||
assert any(["{}/{}".format(ACCOUNT_ID, q_name) in url for url in queue_urls])
|
assert any(["{}/{}".format(ACCOUNT_ID, q_name) in url for url in queue_urls])
|
||||||
|
|
||||||
stack = res.Stack(stack_name)
|
stack = res.Stack(stack_name)
|
||||||
@ -76,7 +76,7 @@ def test_list_stack_resources():
|
|||||||
template_body = simple_queue.substitute(q_name=q_name)
|
template_body = simple_queue.substitute(q_name=q_name)
|
||||||
cf.create_stack(StackName=stack_name, TemplateBody=template_body)
|
cf.create_stack(StackName=stack_name, TemplateBody=template_body)
|
||||||
|
|
||||||
queue_urls = client.list_queues()["QueueUrls"]
|
queue_urls = client.list_queues(QueueNamePrefix=q_name)["QueueUrls"]
|
||||||
assert any(["{}/{}".format(ACCOUNT_ID, q_name) in url for url in queue_urls])
|
assert any(["{}/{}".format(ACCOUNT_ID, q_name) in url for url in queue_urls])
|
||||||
|
|
||||||
queue = cf.list_stack_resources(StackName=stack_name)["StackResourceSummaries"][0]
|
queue = cf.list_stack_resources(StackName=stack_name)["StackResourceSummaries"][0]
|
||||||
@ -98,7 +98,7 @@ def test_create_from_cloudformation_json_with_tags():
|
|||||||
response = cf.describe_stack_resources(StackName=stack_name)
|
response = cf.describe_stack_resources(StackName=stack_name)
|
||||||
q_name = response["StackResources"][0]["PhysicalResourceId"]
|
q_name = response["StackResources"][0]["PhysicalResourceId"]
|
||||||
|
|
||||||
all_urls = client.list_queues()["QueueUrls"]
|
all_urls = client.list_queues(QueueNamePrefix=q_name)["QueueUrls"]
|
||||||
queue_url = [url for url in all_urls if url.endswith(q_name)][0]
|
queue_url = [url for url in all_urls if url.endswith(q_name)][0]
|
||||||
|
|
||||||
queue_tags = client.list_queue_tags(QueueUrl=queue_url)["Tags"]
|
queue_tags = client.list_queue_tags(QueueUrl=queue_url)["Tags"]
|
||||||
|
Loading…
Reference in New Issue
Block a user