341 lines
11 KiB
Python
341 lines
11 KiB
Python
from . import _get_clients
|
|
import random
|
|
import pytest
|
|
import sure # noqa # pylint: disable=unused-import
|
|
from moto import mock_batch
|
|
from uuid import uuid4
|
|
|
|
|
|
@mock_batch
|
|
@pytest.mark.parametrize("use_resource_reqs", [True, False])
|
|
def test_register_task_definition(use_resource_reqs):
|
|
_, _, _, _, batch_client = _get_clients()
|
|
|
|
resp = register_job_def(batch_client, use_resource_reqs=use_resource_reqs)
|
|
|
|
resp.should.contain("jobDefinitionArn")
|
|
resp.should.contain("jobDefinitionName")
|
|
resp.should.contain("revision")
|
|
|
|
assert resp["jobDefinitionArn"].endswith(
|
|
f"{resp['jobDefinitionName']}:{resp['revision']}"
|
|
)
|
|
|
|
|
|
@mock_batch
|
|
@pytest.mark.parametrize("propagate_tags", [None, True, False])
|
|
def test_register_task_definition_with_tags(propagate_tags):
|
|
_, _, _, _, batch_client = _get_clients()
|
|
|
|
job_def_name = str(uuid4())[0:8]
|
|
register_job_def_with_tags(batch_client, job_def_name, propagate_tags)
|
|
|
|
resp = batch_client.describe_job_definitions(jobDefinitionName=job_def_name)
|
|
job_def = resp["jobDefinitions"][0]
|
|
if propagate_tags is None:
|
|
job_def.shouldnt.have.key("propagateTags")
|
|
else:
|
|
job_def.should.have.key("propagateTags").equals(propagate_tags)
|
|
|
|
|
|
@mock_batch
|
|
@pytest.mark.parametrize("platform_capability", ["EC2", "FARGATE"])
|
|
def test_register_task_definition_with_platform_capability(platform_capability):
|
|
_, _, _, _, batch_client = _get_clients()
|
|
|
|
def_name = str(uuid4())[0:6]
|
|
batch_client.register_job_definition(
|
|
jobDefinitionName=def_name,
|
|
type="container",
|
|
containerProperties={
|
|
"image": "busybox",
|
|
"vcpus": 1,
|
|
"memory": 4,
|
|
"command": ["exit", "0"],
|
|
},
|
|
platformCapabilities=[platform_capability],
|
|
)
|
|
|
|
resp = batch_client.describe_job_definitions(jobDefinitionName=def_name)
|
|
resp["jobDefinitions"][0].should.have.key("platformCapabilities").equals(
|
|
[platform_capability]
|
|
)
|
|
|
|
|
|
@mock_batch
|
|
def test_register_task_definition_with_retry_strategies():
|
|
_, _, _, _, batch_client = _get_clients()
|
|
|
|
def_name = str(uuid4())[0:6]
|
|
batch_client.register_job_definition(
|
|
jobDefinitionName=def_name,
|
|
type="container",
|
|
containerProperties={
|
|
"image": "busybox",
|
|
"vcpus": 1,
|
|
"memory": 4,
|
|
"command": ["exit", "0"],
|
|
},
|
|
retryStrategy={
|
|
"attempts": 4,
|
|
"evaluateOnExit": [
|
|
{"onStatusReason": "osr", "action": "RETRY"},
|
|
{"onStatusReason": "osr2", "action": "Exit"},
|
|
],
|
|
},
|
|
)
|
|
|
|
resp = batch_client.describe_job_definitions(jobDefinitionName=def_name)
|
|
resp["jobDefinitions"][0].should.have.key("retryStrategy").equals(
|
|
{
|
|
"attempts": 4,
|
|
"evaluateOnExit": [
|
|
{"onStatusReason": "osr", "action": "retry"},
|
|
{"onStatusReason": "osr2", "action": "exit"},
|
|
],
|
|
}
|
|
)
|
|
|
|
|
|
@mock_batch
|
|
@pytest.mark.parametrize("use_resource_reqs", [True, False])
|
|
def test_reregister_task_definition(use_resource_reqs):
|
|
# Reregistering task with the same name bumps the revision number
|
|
_, _, _, _, batch_client = _get_clients()
|
|
|
|
job_def_name = str(uuid4())[0:6]
|
|
resp1 = register_job_def(
|
|
batch_client, definition_name=job_def_name, use_resource_reqs=use_resource_reqs
|
|
)
|
|
|
|
resp1.should.contain("jobDefinitionArn")
|
|
resp1.should.have.key("jobDefinitionName").equals(job_def_name)
|
|
resp1.should.contain("revision")
|
|
|
|
assert resp1["jobDefinitionArn"].endswith(
|
|
f"{resp1['jobDefinitionName']}:{resp1['revision']}"
|
|
)
|
|
resp1["revision"].should.equal(1)
|
|
|
|
resp2 = register_job_def(
|
|
batch_client, definition_name=job_def_name, use_resource_reqs=use_resource_reqs
|
|
)
|
|
resp2["revision"].should.equal(2)
|
|
|
|
resp2["jobDefinitionArn"].should_not.equal(resp1["jobDefinitionArn"])
|
|
|
|
resp3 = register_job_def(
|
|
batch_client, definition_name=job_def_name, use_resource_reqs=use_resource_reqs
|
|
)
|
|
resp3["revision"].should.equal(3)
|
|
|
|
resp3["jobDefinitionArn"].should_not.equal(resp1["jobDefinitionArn"])
|
|
resp3["jobDefinitionArn"].should_not.equal(resp2["jobDefinitionArn"])
|
|
|
|
resp4 = register_job_def(
|
|
batch_client, definition_name=job_def_name, use_resource_reqs=use_resource_reqs
|
|
)
|
|
resp4["revision"].should.equal(4)
|
|
|
|
resp4["jobDefinitionArn"].should_not.equal(resp1["jobDefinitionArn"])
|
|
resp4["jobDefinitionArn"].should_not.equal(resp2["jobDefinitionArn"])
|
|
resp4["jobDefinitionArn"].should_not.equal(resp3["jobDefinitionArn"])
|
|
|
|
|
|
@mock_batch
|
|
def test_reregister_task_definition_should_not_reuse_parameters_from_inactive_definition():
|
|
# Reregistering task with the same name bumps the revision number
|
|
_, _, _, _, batch_client = _get_clients()
|
|
|
|
job_def_name = str(uuid4())[0:6]
|
|
# Register job definition with parameters
|
|
resp = batch_client.register_job_definition(
|
|
jobDefinitionName=job_def_name,
|
|
type="container",
|
|
containerProperties={
|
|
"image": "busybox",
|
|
"vcpus": 1,
|
|
"memory": 48,
|
|
"command": ["sleep", "0"],
|
|
},
|
|
parameters={"param1": "val1"},
|
|
)
|
|
job_def_arn = resp["jobDefinitionArn"]
|
|
|
|
definitions = batch_client.describe_job_definitions(jobDefinitionName=job_def_name)[
|
|
"jobDefinitions"
|
|
]
|
|
definitions.should.have.length_of(1)
|
|
|
|
definitions[0].should.have.key("parameters").equals({"param1": "val1"})
|
|
|
|
# Deactivate the definition
|
|
batch_client.deregister_job_definition(jobDefinition=job_def_arn)
|
|
|
|
# Second job definition does not provide any parameters
|
|
batch_client.register_job_definition(
|
|
jobDefinitionName=job_def_name,
|
|
type="container",
|
|
containerProperties={
|
|
"image": "busybox",
|
|
"vcpus": 1,
|
|
"memory": 96,
|
|
"command": ["sleep", "0"],
|
|
},
|
|
)
|
|
|
|
definitions = batch_client.describe_job_definitions(jobDefinitionName=job_def_name)[
|
|
"jobDefinitions"
|
|
]
|
|
definitions.should.have.length_of(2)
|
|
|
|
# Only the inactive definition should have the parameters
|
|
actual = [(d["revision"], d["status"], d.get("parameters")) for d in definitions]
|
|
actual.should.contain((1, "INACTIVE", {"param1": "val1"}))
|
|
actual.should.contain((2, "ACTIVE", {}))
|
|
|
|
|
|
@mock_batch
|
|
@pytest.mark.parametrize("use_resource_reqs", [True, False])
|
|
def test_delete_task_definition(use_resource_reqs):
|
|
_, _, _, _, batch_client = _get_clients()
|
|
|
|
resp = register_job_def(
|
|
batch_client, definition_name=str(uuid4()), use_resource_reqs=use_resource_reqs
|
|
)
|
|
name = resp["jobDefinitionName"]
|
|
|
|
batch_client.deregister_job_definition(jobDefinition=resp["jobDefinitionArn"])
|
|
|
|
all_defs = batch_client.describe_job_definitions()["jobDefinitions"]
|
|
[jobdef["jobDefinitionName"] for jobdef in all_defs].should.contain(name)
|
|
|
|
definitions = batch_client.describe_job_definitions(jobDefinitionName=name)[
|
|
"jobDefinitions"
|
|
]
|
|
definitions.should.have.length_of(1)
|
|
|
|
definitions[0].should.have.key("revision").equals(1)
|
|
definitions[0].should.have.key("status").equals("INACTIVE")
|
|
|
|
|
|
@mock_batch
|
|
@pytest.mark.parametrize("use_resource_reqs", [True, False])
|
|
def test_delete_task_definition_by_name(use_resource_reqs):
|
|
_, _, _, _, batch_client = _get_clients()
|
|
|
|
resp = register_job_def(
|
|
batch_client, definition_name=str(uuid4()), use_resource_reqs=use_resource_reqs
|
|
)
|
|
name = resp["jobDefinitionName"]
|
|
|
|
batch_client.deregister_job_definition(jobDefinition=f"{name}:{resp['revision']}")
|
|
|
|
all_defs = batch_client.describe_job_definitions()["jobDefinitions"]
|
|
# We should still see our job definition as INACTIVE, as it is kept for 180 days
|
|
[jobdef["jobDefinitionName"] for jobdef in all_defs].should.contain(name)
|
|
|
|
# Registering the job definition again should up the revision number
|
|
register_job_def(
|
|
batch_client, definition_name=name, use_resource_reqs=use_resource_reqs
|
|
)
|
|
|
|
definitions = batch_client.describe_job_definitions(jobDefinitionName=name)[
|
|
"jobDefinitions"
|
|
]
|
|
definitions.should.have.length_of(2)
|
|
|
|
revision_status = [
|
|
{"revision": d["revision"], "status": d["status"]} for d in definitions
|
|
]
|
|
|
|
revision_status.should.contain({"revision": 1, "status": "INACTIVE"})
|
|
revision_status.should.contain({"revision": 2, "status": "ACTIVE"})
|
|
|
|
|
|
@mock_batch
|
|
@pytest.mark.parametrize("use_resource_reqs", [True, False])
|
|
def test_describe_task_definition(use_resource_reqs):
|
|
_, _, _, _, batch_client = _get_clients()
|
|
|
|
sleep_def_name = f"sleep10_{str(uuid4())[0:6]}"
|
|
other_name = str(uuid4())[0:6]
|
|
tagged_name = str(uuid4())[0:6]
|
|
register_job_def(
|
|
batch_client,
|
|
definition_name=sleep_def_name,
|
|
use_resource_reqs=use_resource_reqs,
|
|
)
|
|
register_job_def(
|
|
batch_client,
|
|
definition_name=sleep_def_name,
|
|
use_resource_reqs=use_resource_reqs,
|
|
)
|
|
register_job_def(
|
|
batch_client, definition_name=other_name, use_resource_reqs=use_resource_reqs
|
|
)
|
|
register_job_def_with_tags(batch_client, definition_name=tagged_name)
|
|
|
|
resp = batch_client.describe_job_definitions(jobDefinitionName=sleep_def_name)
|
|
len(resp["jobDefinitions"]).should.equal(2)
|
|
|
|
job_defs = batch_client.describe_job_definitions()["jobDefinitions"]
|
|
all_names = [jd["jobDefinitionName"] for jd in job_defs]
|
|
all_names.should.contain(sleep_def_name)
|
|
all_names.should.contain(other_name)
|
|
all_names.should.contain(tagged_name)
|
|
|
|
resp = batch_client.describe_job_definitions(
|
|
jobDefinitions=[sleep_def_name, other_name]
|
|
)
|
|
len(resp["jobDefinitions"]).should.equal(3)
|
|
resp["jobDefinitions"][0]["tags"].should.equal({})
|
|
|
|
resp = batch_client.describe_job_definitions(jobDefinitionName=tagged_name)
|
|
resp["jobDefinitions"][0]["tags"].should.equal({"foo": "123", "bar": "456"})
|
|
|
|
for job_definition in resp["jobDefinitions"]:
|
|
job_definition["status"].should.equal("ACTIVE")
|
|
job_definition.shouldnt.have.key("platformCapabilities")
|
|
job_definition.shouldnt.have.key("retryStrategy")
|
|
|
|
|
|
def register_job_def(batch_client, definition_name="sleep10", use_resource_reqs=True):
|
|
container_properties = {"image": "busybox", "command": ["sleep", "10"]}
|
|
|
|
if use_resource_reqs:
|
|
container_properties.update(
|
|
{
|
|
"resourceRequirements": [
|
|
{"value": "0.25", "type": "VCPU"},
|
|
{"value": "512", "type": "MEMORY"},
|
|
]
|
|
}
|
|
)
|
|
else:
|
|
container_properties.update({"memory": 128, "vcpus": 1})
|
|
|
|
return batch_client.register_job_definition(
|
|
jobDefinitionName=definition_name,
|
|
type="container",
|
|
containerProperties=container_properties,
|
|
)
|
|
|
|
|
|
def register_job_def_with_tags(
|
|
batch_client, definition_name="sleep10", propagate_tags=False
|
|
):
|
|
kwargs = {} if propagate_tags is None else {"propagateTags": propagate_tags}
|
|
return batch_client.register_job_definition(
|
|
jobDefinitionName=definition_name,
|
|
type="container",
|
|
containerProperties={
|
|
"image": "busybox",
|
|
"vcpus": 1,
|
|
"memory": random.randint(4, 128),
|
|
"command": ["sleep", "10"],
|
|
},
|
|
tags={"foo": "123", "bar": "456"},
|
|
**kwargs,
|
|
)
|