Batch - Add Attempts to JobDescription

This commit is contained in:
Bert Blommers 2022-02-20 21:54:05 -01:00
parent 24afea36c0
commit f0bb052343
3 changed files with 46 additions and 9 deletions

View File

@ -31,7 +31,6 @@ from moto.core import ACCOUNT_ID as DEFAULT_ACCOUNT_ID
from moto.core.utils import unix_time_millis, BackendDict
from moto.utilities.docker_utilities import DockerModel
from moto import settings
from ..utilities.tagging_service import TaggingService
logger = logging.getLogger(__name__)
COMPUTE_ENVIRONMENT_NAME_REGEX = re.compile(
@ -462,6 +461,9 @@ class Job(threading.Thread, BaseModel, DockerModel):
self._log_backend = log_backend
self.log_stream_name = None
self.attempts = []
self.latest_attempt = None
def describe_short(self):
result = {
"jobId": self.job_id,
@ -504,6 +506,7 @@ class Job(threading.Thread, BaseModel, DockerModel):
result["container"]["logStreamName"] = self.log_stream_name
if self.timeout:
result["timeout"] = self.timeout
result["attempts"] = self.attempts
return result
def _get_container_property(self, p, default):
@ -591,6 +594,7 @@ class Job(threading.Thread, BaseModel, DockerModel):
# TODO setup ecs container instance
self.job_started_at = datetime.datetime.now()
self._start_attempt()
# add host.docker.internal host on linux to emulate Mac + Windows behavior
# for communication with other mock AWS services running on localhost
@ -730,6 +734,27 @@ class Job(threading.Thread, BaseModel, DockerModel):
self.job_stopped = True
self.job_stopped_at = datetime.datetime.now()
self.job_state = "SUCCEEDED" if success else "FAILED"
self._stop_attempt()
def _start_attempt(self):
self.latest_attempt = {
"container": {
"containerInstanceArn": "TBD",
"logStreamName": self.log_stream_name,
"networkInterfaces": [],
"taskArn": self.job_definition.arn,
}
}
self.latest_attempt["startedAt"] = datetime2int_milliseconds(
self.job_started_at
)
self.attempts.append(self.latest_attempt)
def _stop_attempt(self):
self.latest_attempt["container"]["logStreamName"] = self.log_stream_name
self.latest_attempt["stoppedAt"] = datetime2int_milliseconds(
self.job_stopped_at
)
def terminate(self, reason):
if not self.stop:

View File

@ -127,15 +127,28 @@ def test_submit_job():
# Test that describe_jobs() returns timestamps in milliseconds
# github.com/spulec/moto/issues/4364
resp = batch_client.describe_jobs(jobs=[job_id])
created_at = resp["jobs"][0]["createdAt"]
started_at = resp["jobs"][0]["startedAt"]
stopped_at = resp["jobs"][0]["stoppedAt"]
job = batch_client.describe_jobs(jobs=[job_id])["jobs"][0]
created_at = job["createdAt"]
started_at = job["startedAt"]
stopped_at = job["stoppedAt"]
created_at.should.be.greater_than(start_time_milliseconds)
started_at.should.be.greater_than(start_time_milliseconds)
stopped_at.should.be.greater_than(start_time_milliseconds)
# Verify we track attempts
job.should.have.key("attempts").length_of(1)
attempt = job["attempts"][0]
attempt.should.have.key("container")
attempt["container"].should.have.key("containerInstanceArn")
attempt["container"].should.have.key("logStreamName").equals(
job["container"]["logStreamName"]
)
attempt["container"].should.have.key("networkInterfaces")
attempt["container"].should.have.key("taskArn")
attempt.should.have.key("startedAt").equals(started_at)
attempt.should.have.key("stoppedAt").equals(stopped_at)
@mock_logs
@mock_ec2

View File

@ -27,11 +27,10 @@ def test_register_task_definition(use_resource_reqs):
def test_register_task_definition_with_tags(propagate_tags):
_, _, _, _, batch_client = _get_clients()
resp = register_job_def_with_tags(batch_client, propagate_tags=propagate_tags)
job_def_name = str(uuid4())[0:8]
register_job_def_with_tags(batch_client, job_def_name, propagate_tags)
resp = batch_client.describe_job_definitions(
jobDefinitionName=resp["jobDefinitionName"]
)
resp = batch_client.describe_job_definitions(jobDefinitionName=job_def_name)
job_def = resp["jobDefinitions"][0]
if propagate_tags is None:
job_def.shouldnt.have.key("propagateTags")