moto/tests/test_batch/test_batch_task_definition.py

232 lines
7.2 KiB
Python

from . import _get_clients, _setup
import random
import pytest
import sure # noqa # pylint: disable=unused-import
from moto import mock_batch, mock_iam, mock_ec2, mock_ecs
from uuid import uuid4
@mock_ec2
@mock_ecs
@mock_iam
@mock_batch
@pytest.mark.parametrize("use_resource_reqs", [True, False])
def test_register_task_definition(use_resource_reqs):
ec2_client, iam_client, _, _, batch_client = _get_clients()
_setup(ec2_client, iam_client)
resp = register_job_def(batch_client, use_resource_reqs=use_resource_reqs)
resp.should.contain("jobDefinitionArn")
resp.should.contain("jobDefinitionName")
resp.should.contain("revision")
assert resp["jobDefinitionArn"].endswith(
"{0}:{1}".format(resp["jobDefinitionName"], resp["revision"])
)
@mock_ec2
@mock_ecs
@mock_iam
@mock_batch
def test_register_task_definition_with_tags():
ec2_client, iam_client, _, _, batch_client = _get_clients()
_setup(ec2_client, iam_client)
resp = register_job_def_with_tags(batch_client)
resp.should.contain("jobDefinitionArn")
resp.should.contain("jobDefinitionName")
resp.should.contain("revision")
assert resp["jobDefinitionArn"].endswith(
"{0}:{1}".format(resp["jobDefinitionName"], resp["revision"])
)
@mock_ec2
@mock_ecs
@mock_iam
@mock_batch
@pytest.mark.parametrize("use_resource_reqs", [True, False])
def test_reregister_task_definition(use_resource_reqs):
# Reregistering task with the same name bumps the revision number
ec2_client, iam_client, _, _, batch_client = _get_clients()
_setup(ec2_client, iam_client)
job_def_name = str(uuid4())[0:6]
resp1 = register_job_def(
batch_client, definition_name=job_def_name, use_resource_reqs=use_resource_reqs
)
resp1.should.contain("jobDefinitionArn")
resp1.should.have.key("jobDefinitionName").equals(job_def_name)
resp1.should.contain("revision")
assert resp1["jobDefinitionArn"].endswith(
"{0}:{1}".format(resp1["jobDefinitionName"], resp1["revision"])
)
resp1["revision"].should.equal(1)
resp2 = register_job_def(
batch_client, definition_name=job_def_name, use_resource_reqs=use_resource_reqs
)
resp2["revision"].should.equal(2)
resp2["jobDefinitionArn"].should_not.equal(resp1["jobDefinitionArn"])
resp3 = register_job_def(
batch_client, definition_name=job_def_name, use_resource_reqs=use_resource_reqs
)
resp3["revision"].should.equal(3)
resp3["jobDefinitionArn"].should_not.equal(resp1["jobDefinitionArn"])
resp3["jobDefinitionArn"].should_not.equal(resp2["jobDefinitionArn"])
resp4 = register_job_def(
batch_client, definition_name=job_def_name, use_resource_reqs=use_resource_reqs
)
resp4["revision"].should.equal(4)
resp4["jobDefinitionArn"].should_not.equal(resp1["jobDefinitionArn"])
resp4["jobDefinitionArn"].should_not.equal(resp2["jobDefinitionArn"])
resp4["jobDefinitionArn"].should_not.equal(resp3["jobDefinitionArn"])
@mock_ec2
@mock_ecs
@mock_iam
@mock_batch
@pytest.mark.parametrize("use_resource_reqs", [True, False])
def test_delete_task_definition(use_resource_reqs):
ec2_client, iam_client, _, _, batch_client = _get_clients()
_setup(ec2_client, iam_client)
resp = register_job_def(
batch_client, definition_name=str(uuid4()), use_resource_reqs=use_resource_reqs
)
name = resp["jobDefinitionName"]
batch_client.deregister_job_definition(jobDefinition=resp["jobDefinitionArn"])
all_defs = batch_client.describe_job_definitions()["jobDefinitions"]
[jobdef["jobDefinitionName"] for jobdef in all_defs].shouldnt.contain(name)
@mock_ec2
@mock_ecs
@mock_iam
@mock_batch
@pytest.mark.parametrize("use_resource_reqs", [True, False])
def test_delete_task_definition_by_name(use_resource_reqs):
ec2_client, iam_client, _, _, batch_client = _get_clients()
_setup(ec2_client, iam_client)
resp = register_job_def(
batch_client, definition_name=str(uuid4()), use_resource_reqs=use_resource_reqs
)
name = resp["jobDefinitionName"]
batch_client.deregister_job_definition(jobDefinition=f"{name}:{resp['revision']}")
all_defs = batch_client.describe_job_definitions()["jobDefinitions"]
[jobdef["jobDefinitionName"] for jobdef in all_defs].shouldnt.contain(name)
@mock_ec2
@mock_ecs
@mock_iam
@mock_batch
@pytest.mark.parametrize("use_resource_reqs", [True, False])
def test_describe_task_definition(use_resource_reqs):
ec2_client, iam_client, _, _, batch_client = _get_clients()
_setup(ec2_client, iam_client)
sleep_def_name = f"sleep10_{str(uuid4())[0:6]}"
other_name = str(uuid4())[0:6]
tagged_name = str(uuid4())[0:6]
register_job_def(
batch_client,
definition_name=sleep_def_name,
use_resource_reqs=use_resource_reqs,
)
register_job_def(
batch_client,
definition_name=sleep_def_name,
use_resource_reqs=use_resource_reqs,
)
register_job_def(
batch_client, definition_name=other_name, use_resource_reqs=use_resource_reqs
)
register_job_def_with_tags(batch_client, definition_name=tagged_name)
resp = batch_client.describe_job_definitions(jobDefinitionName=sleep_def_name)
len(resp["jobDefinitions"]).should.equal(2)
job_defs = batch_client.describe_job_definitions()["jobDefinitions"]
all_names = [jd["jobDefinitionName"] for jd in job_defs]
all_names.should.contain(sleep_def_name)
all_names.should.contain(other_name)
all_names.should.contain(tagged_name)
resp = batch_client.describe_job_definitions(
jobDefinitions=[sleep_def_name, other_name]
)
len(resp["jobDefinitions"]).should.equal(3)
resp["jobDefinitions"][0]["tags"].should.equal({})
resp = batch_client.describe_job_definitions(jobDefinitionName=tagged_name)
resp["jobDefinitions"][0]["tags"].should.equal(
{"foo": "123", "bar": "456",}
)
for job_definition in resp["jobDefinitions"]:
job_definition["status"].should.equal("ACTIVE")
def register_job_def(batch_client, definition_name="sleep10", use_resource_reqs=True):
container_properties = {
"image": "busybox",
"command": ["sleep", "10"],
}
if use_resource_reqs:
container_properties.update(
{
"resourceRequirements": [
{"value": "1", "type": "VCPU"},
{"value": str(random.randint(4, 128)), "type": "MEMORY"},
]
}
)
else:
container_properties.update(
{"memory": random.randint(4, 128), "vcpus": 1,}
)
return batch_client.register_job_definition(
jobDefinitionName=definition_name,
type="container",
containerProperties={
"image": "busybox",
"vcpus": 1,
"memory": random.randint(4, 128),
"command": ["sleep", "10"],
},
)
def register_job_def_with_tags(batch_client, definition_name="sleep10"):
return batch_client.register_job_definition(
jobDefinitionName=definition_name,
type="container",
containerProperties={
"image": "busybox",
"vcpus": 1,
"memory": random.randint(4, 128),
"command": ["sleep", "10"],
},
tags={"foo": "123", "bar": "456",},
)