From 41de9b82ace6131ce4e143d66cd179ad14d16aae Mon Sep 17 00:00:00 2001 From: Bert Blommers Date: Sat, 27 Nov 2021 05:25:53 -0100 Subject: [PATCH] Batch - implement attemptDurationSeconds (#4636) * Batch - implement attemptDurationSeconds * Batch tests - make job def names unique --- moto/batch/models.py | 52 +++++++++++++++-- moto/batch/responses.py | 4 ++ tests/test_batch/test_batch_jobs.py | 87 ++++++++++++++++++++++++++++- 3 files changed, 134 insertions(+), 9 deletions(-) diff --git a/moto/batch/models.py b/moto/batch/models.py index fa62305c7..fc0307649 100644 --- a/moto/batch/models.py +++ b/moto/batch/models.py @@ -200,6 +200,7 @@ class JobDefinition(CloudFormationModel): tags={}, revision=0, retry_strategy=0, + timeout=None, ): self.name = name self.retries = retry_strategy @@ -210,9 +211,8 @@ class JobDefinition(CloudFormationModel): self.arn = None self.status = "ACTIVE" self.tagger = TaggingService() - if parameters is None: - parameters = {} - self.parameters = parameters + self.parameters = parameters or {} + self.timeout = timeout self._validate() self._update_arn() @@ -295,7 +295,9 @@ class JobDefinition(CloudFormationModel): if vcpus < 1: raise ClientException("container vcpus limit must be greater than 0") - def update(self, parameters, _type, container_properties, retry_strategy, tags): + def update( + self, parameters, _type, container_properties, retry_strategy, tags, timeout + ): if parameters is None: parameters = self.parameters @@ -317,6 +319,7 @@ class JobDefinition(CloudFormationModel): revision=self.revision, retry_strategy=retry_strategy, tags=tags, + timeout=timeout, ) def describe(self): @@ -333,6 +336,8 @@ class JobDefinition(CloudFormationModel): result["containerProperties"] = self.container_properties if self.retries is not None and self.retries > 0: result["retryStrategy"] = {"attempts": self.retries} + if self.timeout: + result["timeout"] = self.timeout return result @@ -362,6 +367,7 @@ class JobDefinition(CloudFormationModel): tags=lowercase_first_key(properties.get("Tags", {})), retry_strategy=lowercase_first_key(properties["RetryStrategy"]), container_properties=lowercase_first_key(properties["ContainerProperties"]), + timeout=lowercase_first_key(properties.get("timeout", {})), ) arn = res[1] @@ -378,6 +384,7 @@ class Job(threading.Thread, BaseModel, DockerModel): container_overrides, depends_on, all_jobs, + timeout, ): """ Docker Job @@ -405,6 +412,7 @@ class Job(threading.Thread, BaseModel, DockerModel): self.job_stopped = False self.job_stopped_reason = None self.depends_on = depends_on + self.timeout = timeout self.all_jobs = all_jobs self.stop = False @@ -447,6 +455,8 @@ class Job(threading.Thread, BaseModel, DockerModel): result["container"]["logStreamName"] = self.log_stream_name if self.job_stopped_reason is not None: result["statusReason"] = self.job_stopped_reason + if self.timeout: + result["timeout"] = self.timeout return result def _get_container_property(self, p, default): @@ -474,6 +484,13 @@ class Job(threading.Thread, BaseModel, DockerModel): p, self.job_definition.container_properties.get(p, default) ) + def _get_attempt_duration(self): + if self.timeout: + return self.timeout["attemptDurationSeconds"] + if self.job_definition.timeout: + return self.job_definition.timeout["attemptDurationSeconds"] + return None + def run(self): """ Run the container. @@ -546,10 +563,23 @@ class Job(threading.Thread, BaseModel, DockerModel): self.job_state = "RUNNING" try: container.reload() + + max_time = None + if self._get_attempt_duration(): + attempt_duration = self._get_attempt_duration() + max_time = self.job_started_at + datetime.timedelta( + seconds=attempt_duration + ) + while container.status == "running" and not self.stop: container.reload() time.sleep(0.5) + if max_time and datetime.datetime.now() > max_time: + raise Exception( + "Job time exceeded the configured attemptDurationSeconds" + ) + # Container should be stopped by this point... unless asked to stop if container.status == "running": container.kill() @@ -1266,7 +1296,14 @@ class BatchBackend(BaseBackend): del self._job_queues[job_queue.arn] def register_job_definition( - self, def_name, parameters, _type, tags, retry_strategy, container_properties + self, + def_name, + parameters, + _type, + tags, + retry_strategy, + container_properties, + timeout, ): if def_name is None: raise ClientException("jobDefinitionName must be provided") @@ -1288,11 +1325,12 @@ class BatchBackend(BaseBackend): tags=tags, region_name=self.region_name, retry_strategy=retry_strategy, + timeout=timeout, ) else: # Make new jobdef job_def = job_def.update( - parameters, _type, container_properties, retry_strategy, tags + parameters, _type, container_properties, retry_strategy, tags, timeout ) self._job_definitions[job_def.arn] = job_def @@ -1347,6 +1385,7 @@ class BatchBackend(BaseBackend): retries=None, depends_on=None, container_overrides=None, + timeout=None, ): # TODO parameters, retries (which is a dict raw from request), job dependencies and container overrides are ignored for now @@ -1369,6 +1408,7 @@ class BatchBackend(BaseBackend): container_overrides=container_overrides, depends_on=depends_on, all_jobs=self._jobs, + timeout=timeout, ) self._jobs[job.job_id] = job diff --git a/moto/batch/responses.py b/moto/batch/responses.py index 62ca04a44..01cb6d94f 100644 --- a/moto/batch/responses.py +++ b/moto/batch/responses.py @@ -179,6 +179,7 @@ class BatchResponse(BaseResponse): tags = self._get_param("tags") retry_strategy = self._get_param("retryStrategy") _type = self._get_param("type") + timeout = self._get_param("timeout") try: name, arn, revision = self.batch_backend.register_job_definition( def_name=def_name, @@ -187,6 +188,7 @@ class BatchResponse(BaseResponse): tags=tags, retry_strategy=retry_strategy, container_properties=container_properties, + timeout=timeout, ) except AWSError as err: return err.response() @@ -231,6 +233,7 @@ class BatchResponse(BaseResponse): job_queue = self._get_param("jobQueue") parameters = self._get_param("parameters") retries = self._get_param("retryStrategy") + timeout = self._get_param("timeout") try: name, job_id = self.batch_backend.submit_job( @@ -241,6 +244,7 @@ class BatchResponse(BaseResponse): retries=retries, depends_on=depends_on, container_overrides=container_overrides, + timeout=timeout, ) except AWSError as err: return err.response() diff --git a/tests/test_batch/test_batch_jobs.py b/tests/test_batch/test_batch_jobs.py index 5abe209ad..4f0c5cb5b 100644 --- a/tests/test_batch/test_batch_jobs.py +++ b/tests/test_batch/test_batch_jobs.py @@ -684,8 +684,9 @@ def test_update_job_definition(): "vcpus": 2, } + job_def_name = str(uuid4())[0:6] batch_client.register_job_definition( - jobDefinitionName="test-job", + jobDefinitionName=job_def_name, type="container", tags=tags[0], parameters={}, @@ -694,20 +695,100 @@ def test_update_job_definition(): container_props["memory"] = 2048 batch_client.register_job_definition( - jobDefinitionName="test-job", + jobDefinitionName=job_def_name, type="container", tags=tags[1], parameters={}, containerProperties=container_props, ) - job_defs = batch_client.describe_job_definitions(jobDefinitionName="test-job")[ + job_defs = batch_client.describe_job_definitions(jobDefinitionName=job_def_name)[ "jobDefinitions" ] job_defs.should.have.length_of(2) job_defs[0]["containerProperties"]["memory"].should.equal(1024) job_defs[0]["tags"].should.equal(tags[0]) + job_defs[0].shouldnt.have.key("timeout") job_defs[1]["containerProperties"]["memory"].should.equal(2048) job_defs[1]["tags"].should.equal(tags[1]) + + +@mock_batch +def test_register_job_definition_with_timeout(): + _, _, _, _, batch_client = _get_clients() + + container_props = { + "image": "amazonlinux", + "memory": 1024, + "vcpus": 2, + } + + job_def_name = str(uuid4())[0:6] + batch_client.register_job_definition( + jobDefinitionName=job_def_name, + type="container", + parameters={}, + containerProperties=container_props, + timeout={"attemptDurationSeconds": 3}, + ) + + resp = batch_client.describe_job_definitions(jobDefinitionName=job_def_name) + job_def = resp["jobDefinitions"][0] + job_def.should.have.key("timeout").equals({"attemptDurationSeconds": 3}) + + +@mock_batch +@mock_ec2 +@mock_iam +def test_submit_job_with_timeout(): + ec2_client, iam_client, _, _, batch_client = _get_clients() + _, _, _, iam_arn = _setup(ec2_client, iam_client) + + job_def_name = str(uuid4())[0:6] + commands = ["sleep", "3"] + job_def_arn, queue_arn = prepare_job(batch_client, commands, iam_arn, job_def_name) + + resp = batch_client.submit_job( + jobName=str(uuid4())[0:6], + jobQueue=queue_arn, + jobDefinition=job_def_arn, + timeout={"attemptDurationSeconds": 1}, + ) + job_id = resp["jobId"] + + # This should fail, as the job-duration is longer than the attemptDurationSeconds + _wait_for_job_status(batch_client, job_id, "FAILED") + + +@mock_batch +@mock_ec2 +@mock_iam +def test_submit_job_with_timeout_set_at_definition(): + ec2_client, iam_client, _, _, batch_client = _get_clients() + _, _, _, iam_arn = _setup(ec2_client, iam_client) + + job_def_name = str(uuid4())[0:6] + commands = ["sleep", "3"] + _, queue_arn = prepare_job(batch_client, commands, iam_arn, job_def_name) + resp = batch_client.register_job_definition( + jobDefinitionName=job_def_name, + type="container", + containerProperties={ + "image": "busybox:latest", + "vcpus": 1, + "memory": 128, + "command": commands, + }, + timeout={"attemptDurationSeconds": 1}, + ) + job_def_arn = resp["jobDefinitionArn"] + + resp = batch_client.submit_job( + jobName=str(uuid4())[0:6], jobQueue=queue_arn, jobDefinition=job_def_arn + ) + job_id = resp["jobId"] + + # This should fail, as the job-duration is longer than the attemptDurationSeconds + _wait_for_job_status(batch_client, job_id, "FAILED")