moto/tests/test_batch/test_batch.py

889 lines
27 KiB
Python
Raw Normal View History

2017-09-26 16:37:26 +00:00
from __future__ import unicode_literals
import time
import datetime
2017-09-26 16:37:26 +00:00
import boto3
from botocore.exceptions import ClientError
2017-09-26 16:37:26 +00:00
import sure # noqa
from moto import mock_batch, mock_iam, mock_ec2, mock_ecs, mock_logs
2020-10-06 06:04:09 +00:00
import pytest
2019-10-31 15:44:26 +00:00
DEFAULT_REGION = "eu-central-1"
def _get_clients():
2019-10-31 15:44:26 +00:00
return (
boto3.client("ec2", region_name=DEFAULT_REGION),
boto3.client("iam", region_name=DEFAULT_REGION),
boto3.client("ecs", region_name=DEFAULT_REGION),
boto3.client("logs", region_name=DEFAULT_REGION),
boto3.client("batch", region_name=DEFAULT_REGION),
)
def _setup(ec2_client, iam_client):
"""
Do prerequisite setup
:return: VPC ID, Subnet ID, Security group ID, IAM Role ARN
:rtype: tuple
"""
2019-10-31 15:44:26 +00:00
resp = ec2_client.create_vpc(CidrBlock="172.30.0.0/24")
vpc_id = resp["Vpc"]["VpcId"]
resp = ec2_client.create_subnet(
2019-10-31 15:44:26 +00:00
AvailabilityZone="eu-central-1a", CidrBlock="172.30.0.0/25", VpcId=vpc_id
)
2019-10-31 15:44:26 +00:00
subnet_id = resp["Subnet"]["SubnetId"]
resp = ec2_client.create_security_group(
2019-10-31 15:44:26 +00:00
Description="test_sg_desc", GroupName="test_sg", VpcId=vpc_id
)
2019-10-31 15:44:26 +00:00
sg_id = resp["GroupId"]
resp = iam_client.create_role(
2019-10-31 15:44:26 +00:00
RoleName="TestRole", AssumeRolePolicyDocument="some_policy"
)
2019-10-31 15:44:26 +00:00
iam_arn = resp["Role"]["Arn"]
2019-12-13 17:52:37 +00:00
iam_client.create_instance_profile(InstanceProfileName="TestRole")
iam_client.add_role_to_instance_profile(
InstanceProfileName="TestRole", RoleName="TestRole"
)
return vpc_id, subnet_id, sg_id, iam_arn
# Yes, yes it talks to all the things
@mock_ec2
@mock_ecs
@mock_iam
2017-09-26 16:37:26 +00:00
@mock_batch
def test_create_managed_compute_environment():
ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients()
vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client)
2019-10-31 15:44:26 +00:00
compute_name = "test_compute_env"
resp = batch_client.create_compute_environment(
computeEnvironmentName=compute_name,
2019-10-31 15:44:26 +00:00
type="MANAGED",
state="ENABLED",
computeResources={
2019-10-31 15:44:26 +00:00
"type": "EC2",
"minvCpus": 5,
"maxvCpus": 10,
"desiredvCpus": 5,
"instanceTypes": ["t2.small", "t2.medium"],
"imageId": "some_image_id",
"subnets": [subnet_id],
"securityGroupIds": [sg_id],
"ec2KeyPair": "string",
2019-12-13 17:52:37 +00:00
"instanceRole": iam_arn.replace("role", "instance-profile"),
2019-10-31 15:44:26 +00:00
"tags": {"string": "string"},
"bidPercentage": 123,
"spotIamFleetRole": "string",
},
2019-10-31 15:44:26 +00:00
serviceRole=iam_arn,
)
2019-10-31 15:44:26 +00:00
resp.should.contain("computeEnvironmentArn")
resp["computeEnvironmentName"].should.equal(compute_name)
# Given a t2.medium is 2 vcpu and t2.small is 1, therefore 2 mediums and 1 small should be created
resp = ec2_client.describe_instances()
2019-10-31 15:44:26 +00:00
resp.should.contain("Reservations")
len(resp["Reservations"]).should.equal(3)
# Should have created 1 ECS cluster
resp = ecs_client.list_clusters()
2019-10-31 15:44:26 +00:00
resp.should.contain("clusterArns")
len(resp["clusterArns"]).should.equal(1)
@mock_ec2
@mock_ecs
@mock_iam
@mock_batch
def test_create_unmanaged_compute_environment():
ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients()
vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client)
2019-10-31 15:44:26 +00:00
compute_name = "test_compute_env"
resp = batch_client.create_compute_environment(
computeEnvironmentName=compute_name,
2019-10-31 15:44:26 +00:00
type="UNMANAGED",
state="ENABLED",
serviceRole=iam_arn,
)
2019-10-31 15:44:26 +00:00
resp.should.contain("computeEnvironmentArn")
resp["computeEnvironmentName"].should.equal(compute_name)
# Its unmanaged so no instances should be created
resp = ec2_client.describe_instances()
2019-10-31 15:44:26 +00:00
resp.should.contain("Reservations")
len(resp["Reservations"]).should.equal(0)
# Should have created 1 ECS cluster
resp = ecs_client.list_clusters()
2019-10-31 15:44:26 +00:00
resp.should.contain("clusterArns")
len(resp["clusterArns"]).should.equal(1)
# TODO create 1000s of tests to test complex option combinations of create environment
@mock_ec2
@mock_ecs
@mock_iam
@mock_batch
def test_describe_compute_environment():
ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients()
vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client)
2019-10-31 15:44:26 +00:00
compute_name = "test_compute_env"
batch_client.create_compute_environment(
computeEnvironmentName=compute_name,
2019-10-31 15:44:26 +00:00
type="UNMANAGED",
state="ENABLED",
serviceRole=iam_arn,
)
resp = batch_client.describe_compute_environments()
2019-10-31 15:44:26 +00:00
len(resp["computeEnvironments"]).should.equal(1)
resp["computeEnvironments"][0]["computeEnvironmentName"].should.equal(compute_name)
# Test filtering
2019-10-31 15:44:26 +00:00
resp = batch_client.describe_compute_environments(computeEnvironments=["test1"])
len(resp["computeEnvironments"]).should.equal(0)
@mock_ec2
@mock_ecs
@mock_iam
@mock_batch
def test_delete_unmanaged_compute_environment():
ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients()
vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client)
2019-10-31 15:44:26 +00:00
compute_name = "test_compute_env"
batch_client.create_compute_environment(
computeEnvironmentName=compute_name,
2019-10-31 15:44:26 +00:00
type="UNMANAGED",
state="ENABLED",
serviceRole=iam_arn,
)
2019-10-31 15:44:26 +00:00
batch_client.delete_compute_environment(computeEnvironment=compute_name)
resp = batch_client.describe_compute_environments()
2019-10-31 15:44:26 +00:00
len(resp["computeEnvironments"]).should.equal(0)
resp = ecs_client.list_clusters()
2019-10-31 15:44:26 +00:00
len(resp.get("clusterArns", [])).should.equal(0)
@mock_ec2
@mock_ecs
@mock_iam
@mock_batch
def test_delete_managed_compute_environment():
ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients()
vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client)
2019-10-31 15:44:26 +00:00
compute_name = "test_compute_env"
batch_client.create_compute_environment(
computeEnvironmentName=compute_name,
2019-10-31 15:44:26 +00:00
type="MANAGED",
state="ENABLED",
computeResources={
2019-10-31 15:44:26 +00:00
"type": "EC2",
"minvCpus": 5,
"maxvCpus": 10,
"desiredvCpus": 5,
"instanceTypes": ["t2.small", "t2.medium"],
"imageId": "some_image_id",
"subnets": [subnet_id],
"securityGroupIds": [sg_id],
"ec2KeyPair": "string",
2019-12-13 17:52:37 +00:00
"instanceRole": iam_arn.replace("role", "instance-profile"),
2019-10-31 15:44:26 +00:00
"tags": {"string": "string"},
"bidPercentage": 123,
"spotIamFleetRole": "string",
},
2019-10-31 15:44:26 +00:00
serviceRole=iam_arn,
)
2019-10-31 15:44:26 +00:00
batch_client.delete_compute_environment(computeEnvironment=compute_name)
resp = batch_client.describe_compute_environments()
2019-10-31 15:44:26 +00:00
len(resp["computeEnvironments"]).should.equal(0)
resp = ec2_client.describe_instances()
2019-10-31 15:44:26 +00:00
resp.should.contain("Reservations")
len(resp["Reservations"]).should.equal(3)
for reservation in resp["Reservations"]:
reservation["Instances"][0]["State"]["Name"].should.equal("terminated")
resp = ecs_client.list_clusters()
2019-10-31 15:44:26 +00:00
len(resp.get("clusterArns", [])).should.equal(0)
@mock_ec2
@mock_ecs
@mock_iam
@mock_batch
def test_update_unmanaged_compute_environment_state():
ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients()
vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client)
2019-10-31 15:44:26 +00:00
compute_name = "test_compute_env"
batch_client.create_compute_environment(
computeEnvironmentName=compute_name,
2019-10-31 15:44:26 +00:00
type="UNMANAGED",
state="ENABLED",
serviceRole=iam_arn,
)
batch_client.update_compute_environment(
2019-10-31 15:44:26 +00:00
computeEnvironment=compute_name, state="DISABLED"
)
resp = batch_client.describe_compute_environments()
2019-10-31 15:44:26 +00:00
len(resp["computeEnvironments"]).should.equal(1)
resp["computeEnvironments"][0]["state"].should.equal("DISABLED")
@mock_ec2
@mock_ecs
@mock_iam
@mock_batch
def test_create_job_queue():
ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients()
vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client)
2019-10-31 15:44:26 +00:00
compute_name = "test_compute_env"
resp = batch_client.create_compute_environment(
computeEnvironmentName=compute_name,
2019-10-31 15:44:26 +00:00
type="UNMANAGED",
state="ENABLED",
serviceRole=iam_arn,
)
2019-10-31 15:44:26 +00:00
arn = resp["computeEnvironmentArn"]
resp = batch_client.create_job_queue(
2019-10-31 15:44:26 +00:00
jobQueueName="test_job_queue",
state="ENABLED",
priority=123,
2019-10-31 15:44:26 +00:00
computeEnvironmentOrder=[{"order": 123, "computeEnvironment": arn}],
)
2019-10-31 15:44:26 +00:00
resp.should.contain("jobQueueArn")
resp.should.contain("jobQueueName")
queue_arn = resp["jobQueueArn"]
resp = batch_client.describe_job_queues()
2019-10-31 15:44:26 +00:00
resp.should.contain("jobQueues")
len(resp["jobQueues"]).should.equal(1)
resp["jobQueues"][0]["jobQueueArn"].should.equal(queue_arn)
2019-10-31 15:44:26 +00:00
resp = batch_client.describe_job_queues(jobQueues=["test_invalid_queue"])
resp.should.contain("jobQueues")
len(resp["jobQueues"]).should.equal(0)
2017-10-03 22:28:10 +00:00
2019-02-17 15:25:35 +00:00
# Create job queue which already exists
try:
resp = batch_client.create_job_queue(
2019-10-31 15:44:26 +00:00
jobQueueName="test_job_queue",
state="ENABLED",
2019-02-17 15:25:35 +00:00
priority=123,
2019-10-31 15:44:26 +00:00
computeEnvironmentOrder=[{"order": 123, "computeEnvironment": arn}],
2019-02-17 15:25:35 +00:00
)
except ClientError as err:
2019-10-31 15:44:26 +00:00
err.response["Error"]["Code"].should.equal("ClientException")
2019-02-17 15:25:35 +00:00
# Create job queue with incorrect state
try:
resp = batch_client.create_job_queue(
2019-10-31 15:44:26 +00:00
jobQueueName="test_job_queue2",
state="JUNK",
2019-02-17 15:25:35 +00:00
priority=123,
2019-10-31 15:44:26 +00:00
computeEnvironmentOrder=[{"order": 123, "computeEnvironment": arn}],
2019-02-17 15:25:35 +00:00
)
except ClientError as err:
2019-10-31 15:44:26 +00:00
err.response["Error"]["Code"].should.equal("ClientException")
2019-02-17 15:25:35 +00:00
# Create job queue with no compute env
try:
resp = batch_client.create_job_queue(
2019-10-31 15:44:26 +00:00
jobQueueName="test_job_queue3",
state="JUNK",
2019-02-17 15:25:35 +00:00
priority=123,
2019-10-31 15:44:26 +00:00
computeEnvironmentOrder=[],
2019-02-17 15:25:35 +00:00
)
except ClientError as err:
2019-10-31 15:44:26 +00:00
err.response["Error"]["Code"].should.equal("ClientException")
@mock_ec2
@mock_ecs
@mock_iam
@mock_batch
def test_job_queue_bad_arn():
ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients()
vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client)
2019-10-31 15:44:26 +00:00
compute_name = "test_compute_env"
resp = batch_client.create_compute_environment(
computeEnvironmentName=compute_name,
2019-10-31 15:44:26 +00:00
type="UNMANAGED",
state="ENABLED",
serviceRole=iam_arn,
)
2019-10-31 15:44:26 +00:00
arn = resp["computeEnvironmentArn"]
try:
batch_client.create_job_queue(
2019-10-31 15:44:26 +00:00
jobQueueName="test_job_queue",
state="ENABLED",
priority=123,
computeEnvironmentOrder=[
2019-10-31 15:44:26 +00:00
{"order": 123, "computeEnvironment": arn + "LALALA"}
],
)
except ClientError as err:
2019-10-31 15:44:26 +00:00
err.response["Error"]["Code"].should.equal("ClientException")
@mock_ec2
@mock_ecs
@mock_iam
@mock_batch
def test_update_job_queue():
ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients()
vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client)
2019-10-31 15:44:26 +00:00
compute_name = "test_compute_env"
resp = batch_client.create_compute_environment(
computeEnvironmentName=compute_name,
2019-10-31 15:44:26 +00:00
type="UNMANAGED",
state="ENABLED",
serviceRole=iam_arn,
)
2019-10-31 15:44:26 +00:00
arn = resp["computeEnvironmentArn"]
resp = batch_client.create_job_queue(
2019-10-31 15:44:26 +00:00
jobQueueName="test_job_queue",
state="ENABLED",
priority=123,
2019-10-31 15:44:26 +00:00
computeEnvironmentOrder=[{"order": 123, "computeEnvironment": arn}],
)
2019-10-31 15:44:26 +00:00
queue_arn = resp["jobQueueArn"]
2019-10-31 15:44:26 +00:00
batch_client.update_job_queue(jobQueue=queue_arn, priority=5)
resp = batch_client.describe_job_queues()
2019-10-31 15:44:26 +00:00
resp.should.contain("jobQueues")
len(resp["jobQueues"]).should.equal(1)
resp["jobQueues"][0]["priority"].should.equal(5)
2019-10-31 15:44:26 +00:00
batch_client.update_job_queue(jobQueue="test_job_queue", priority=5)
2019-02-17 15:25:35 +00:00
resp = batch_client.describe_job_queues()
2019-10-31 15:44:26 +00:00
resp.should.contain("jobQueues")
len(resp["jobQueues"]).should.equal(1)
resp["jobQueues"][0]["priority"].should.equal(5)
2019-02-17 15:25:35 +00:00
@mock_ec2
@mock_ecs
@mock_iam
@mock_batch
def test_update_job_queue():
ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients()
vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client)
2019-10-31 15:44:26 +00:00
compute_name = "test_compute_env"
resp = batch_client.create_compute_environment(
computeEnvironmentName=compute_name,
2019-10-31 15:44:26 +00:00
type="UNMANAGED",
state="ENABLED",
serviceRole=iam_arn,
)
2019-10-31 15:44:26 +00:00
arn = resp["computeEnvironmentArn"]
resp = batch_client.create_job_queue(
2019-10-31 15:44:26 +00:00
jobQueueName="test_job_queue",
state="ENABLED",
priority=123,
2019-10-31 15:44:26 +00:00
computeEnvironmentOrder=[{"order": 123, "computeEnvironment": arn}],
)
2019-10-31 15:44:26 +00:00
queue_arn = resp["jobQueueArn"]
2019-10-31 15:44:26 +00:00
batch_client.delete_job_queue(jobQueue=queue_arn)
resp = batch_client.describe_job_queues()
2019-10-31 15:44:26 +00:00
resp.should.contain("jobQueues")
len(resp["jobQueues"]).should.equal(0)
2017-10-04 19:17:29 +00:00
@mock_ec2
@mock_ecs
@mock_iam
@mock_batch
def test_register_task_definition():
ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients()
2017-10-04 19:17:29 +00:00
vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client)
resp = batch_client.register_job_definition(
2019-10-31 15:44:26 +00:00
jobDefinitionName="sleep10",
type="container",
2017-10-04 19:17:29 +00:00
containerProperties={
2019-10-31 15:44:26 +00:00
"image": "busybox",
"vcpus": 1,
"memory": 128,
"command": ["sleep", "10"],
},
2017-10-04 19:17:29 +00:00
)
2019-10-31 15:44:26 +00:00
resp.should.contain("jobDefinitionArn")
resp.should.contain("jobDefinitionName")
resp.should.contain("revision")
2017-10-04 19:17:29 +00:00
2019-10-31 15:44:26 +00:00
assert resp["jobDefinitionArn"].endswith(
"{0}:{1}".format(resp["jobDefinitionName"], resp["revision"])
)
2017-10-04 23:00:40 +00:00
@mock_ec2
@mock_ecs
@mock_iam
@mock_batch
def test_reregister_task_definition():
# Reregistering task with the same name bumps the revision number
ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients()
2017-10-04 23:00:40 +00:00
vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client)
resp1 = batch_client.register_job_definition(
2019-10-31 15:44:26 +00:00
jobDefinitionName="sleep10",
type="container",
2017-10-04 23:00:40 +00:00
containerProperties={
2019-10-31 15:44:26 +00:00
"image": "busybox",
"vcpus": 1,
"memory": 128,
"command": ["sleep", "10"],
},
2017-10-04 23:00:40 +00:00
)
2019-10-31 15:44:26 +00:00
resp1.should.contain("jobDefinitionArn")
resp1.should.contain("jobDefinitionName")
resp1.should.contain("revision")
2017-10-04 23:00:40 +00:00
2019-10-31 15:44:26 +00:00
assert resp1["jobDefinitionArn"].endswith(
"{0}:{1}".format(resp1["jobDefinitionName"], resp1["revision"])
)
resp1["revision"].should.equal(1)
2017-10-04 23:00:40 +00:00
resp2 = batch_client.register_job_definition(
2019-10-31 15:44:26 +00:00
jobDefinitionName="sleep10",
type="container",
2017-10-04 23:00:40 +00:00
containerProperties={
2019-10-31 15:44:26 +00:00
"image": "busybox",
"vcpus": 1,
"memory": 68,
"command": ["sleep", "10"],
},
2017-10-04 23:00:40 +00:00
)
2019-10-31 15:44:26 +00:00
resp2["revision"].should.equal(2)
2017-10-04 23:00:40 +00:00
2019-10-31 15:44:26 +00:00
resp2["jobDefinitionArn"].should_not.equal(resp1["jobDefinitionArn"])
2017-10-04 23:00:40 +00:00
resp3 = batch_client.register_job_definition(
2019-10-31 15:44:26 +00:00
jobDefinitionName="sleep10",
type="container",
containerProperties={
2019-10-31 15:44:26 +00:00
"image": "busybox",
"vcpus": 1,
"memory": 42,
"command": ["sleep", "10"],
},
)
2019-10-31 15:44:26 +00:00
resp3["revision"].should.equal(3)
2019-10-31 15:44:26 +00:00
resp3["jobDefinitionArn"].should_not.equal(resp1["jobDefinitionArn"])
resp3["jobDefinitionArn"].should_not.equal(resp2["jobDefinitionArn"])
resp4 = batch_client.register_job_definition(
2019-10-31 15:44:26 +00:00
jobDefinitionName="sleep10",
type="container",
containerProperties={
2019-10-31 15:44:26 +00:00
"image": "busybox",
"vcpus": 1,
"memory": 41,
"command": ["sleep", "10"],
},
)
2019-10-31 15:44:26 +00:00
resp4["revision"].should.equal(4)
2019-10-31 15:44:26 +00:00
resp4["jobDefinitionArn"].should_not.equal(resp1["jobDefinitionArn"])
resp4["jobDefinitionArn"].should_not.equal(resp2["jobDefinitionArn"])
resp4["jobDefinitionArn"].should_not.equal(resp3["jobDefinitionArn"])
2017-10-04 23:00:40 +00:00
@mock_ec2
@mock_ecs
@mock_iam
@mock_batch
def test_delete_task_definition():
ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients()
2017-10-04 23:00:40 +00:00
vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client)
resp = batch_client.register_job_definition(
2019-10-31 15:44:26 +00:00
jobDefinitionName="sleep10",
type="container",
2017-10-04 23:00:40 +00:00
containerProperties={
2019-10-31 15:44:26 +00:00
"image": "busybox",
"vcpus": 1,
"memory": 128,
"command": ["sleep", "10"],
},
2017-10-04 23:00:40 +00:00
)
2019-10-31 15:44:26 +00:00
batch_client.deregister_job_definition(jobDefinition=resp["jobDefinitionArn"])
2017-10-04 23:00:40 +00:00
resp = batch_client.describe_job_definitions()
2019-10-31 15:44:26 +00:00
len(resp["jobDefinitions"]).should.equal(0)
2017-10-04 23:00:40 +00:00
@mock_ec2
@mock_ecs
@mock_iam
@mock_batch
def test_describe_task_definition():
ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients()
2017-10-04 23:00:40 +00:00
vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client)
batch_client.register_job_definition(
2019-10-31 15:44:26 +00:00
jobDefinitionName="sleep10",
type="container",
2017-10-04 23:00:40 +00:00
containerProperties={
2019-10-31 15:44:26 +00:00
"image": "busybox",
"vcpus": 1,
"memory": 128,
"command": ["sleep", "10"],
},
2017-10-04 23:00:40 +00:00
)
batch_client.register_job_definition(
2019-10-31 15:44:26 +00:00
jobDefinitionName="sleep10",
type="container",
2017-10-04 23:00:40 +00:00
containerProperties={
2019-10-31 15:44:26 +00:00
"image": "busybox",
"vcpus": 1,
"memory": 64,
"command": ["sleep", "10"],
},
2017-10-04 23:00:40 +00:00
)
batch_client.register_job_definition(
2019-10-31 15:44:26 +00:00
jobDefinitionName="test1",
type="container",
2017-10-04 23:00:40 +00:00
containerProperties={
2019-10-31 15:44:26 +00:00
"image": "busybox",
"vcpus": 1,
"memory": 64,
"command": ["sleep", "10"],
},
2017-10-04 23:00:40 +00:00
)
2019-10-31 15:44:26 +00:00
resp = batch_client.describe_job_definitions(jobDefinitionName="sleep10")
len(resp["jobDefinitions"]).should.equal(2)
2017-10-04 23:00:40 +00:00
resp = batch_client.describe_job_definitions()
2019-10-31 15:44:26 +00:00
len(resp["jobDefinitions"]).should.equal(3)
2017-10-04 23:00:40 +00:00
2019-10-31 15:44:26 +00:00
resp = batch_client.describe_job_definitions(jobDefinitions=["sleep10", "test1"])
len(resp["jobDefinitions"]).should.equal(3)
2017-10-04 23:00:40 +00:00
2019-12-13 05:07:29 +00:00
for job_definition in resp["jobDefinitions"]:
job_definition["status"].should.equal("ACTIVE")
@mock_logs
@mock_ec2
@mock_ecs
@mock_iam
@mock_batch
def test_submit_job_by_name():
ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients()
vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client)
2019-10-31 15:44:26 +00:00
compute_name = "test_compute_env"
resp = batch_client.create_compute_environment(
computeEnvironmentName=compute_name,
2019-10-31 15:44:26 +00:00
type="UNMANAGED",
state="ENABLED",
serviceRole=iam_arn,
)
2019-10-31 15:44:26 +00:00
arn = resp["computeEnvironmentArn"]
resp = batch_client.create_job_queue(
2019-10-31 15:44:26 +00:00
jobQueueName="test_job_queue",
state="ENABLED",
priority=123,
2019-10-31 15:44:26 +00:00
computeEnvironmentOrder=[{"order": 123, "computeEnvironment": arn}],
)
2019-10-31 15:44:26 +00:00
queue_arn = resp["jobQueueArn"]
2019-10-31 15:44:26 +00:00
job_definition_name = "sleep10"
batch_client.register_job_definition(
jobDefinitionName=job_definition_name,
2019-10-31 15:44:26 +00:00
type="container",
containerProperties={
2019-10-31 15:44:26 +00:00
"image": "busybox",
"vcpus": 1,
"memory": 128,
"command": ["sleep", "10"],
},
)
batch_client.register_job_definition(
jobDefinitionName=job_definition_name,
2019-10-31 15:44:26 +00:00
type="container",
containerProperties={
2019-10-31 15:44:26 +00:00
"image": "busybox",
"vcpus": 1,
"memory": 256,
"command": ["sleep", "10"],
},
)
resp = batch_client.register_job_definition(
jobDefinitionName=job_definition_name,
2019-10-31 15:44:26 +00:00
type="container",
containerProperties={
2019-10-31 15:44:26 +00:00
"image": "busybox",
"vcpus": 1,
"memory": 512,
"command": ["sleep", "10"],
},
)
2019-10-31 15:44:26 +00:00
job_definition_arn = resp["jobDefinitionArn"]
resp = batch_client.submit_job(
2019-10-31 15:44:26 +00:00
jobName="test1", jobQueue=queue_arn, jobDefinition=job_definition_name
)
2019-10-31 15:44:26 +00:00
job_id = resp["jobId"]
resp_jobs = batch_client.describe_jobs(jobs=[job_id])
# batch_client.terminate_job(jobId=job_id)
2019-10-31 15:44:26 +00:00
len(resp_jobs["jobs"]).should.equal(1)
resp_jobs["jobs"][0]["jobId"].should.equal(job_id)
resp_jobs["jobs"][0]["jobQueue"].should.equal(queue_arn)
resp_jobs["jobs"][0]["jobDefinition"].should.equal(job_definition_arn)
# SLOW TESTS
2020-03-12 14:07:34 +00:00
@mock_logs
@mock_ec2
@mock_ecs
@mock_iam
@mock_batch
2020-10-06 06:04:09 +00:00
@pytest.mark.network
def test_submit_job():
ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients()
vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client)
2019-10-31 15:44:26 +00:00
compute_name = "test_compute_env"
resp = batch_client.create_compute_environment(
computeEnvironmentName=compute_name,
2019-10-31 15:44:26 +00:00
type="UNMANAGED",
state="ENABLED",
serviceRole=iam_arn,
)
2019-10-31 15:44:26 +00:00
arn = resp["computeEnvironmentArn"]
resp = batch_client.create_job_queue(
2019-10-31 15:44:26 +00:00
jobQueueName="test_job_queue",
state="ENABLED",
priority=123,
2019-10-31 15:44:26 +00:00
computeEnvironmentOrder=[{"order": 123, "computeEnvironment": arn}],
)
2019-10-31 15:44:26 +00:00
queue_arn = resp["jobQueueArn"]
resp = batch_client.register_job_definition(
2020-03-12 13:37:46 +00:00
jobDefinitionName="sayhellotomylittlefriend",
2019-10-31 15:44:26 +00:00
type="container",
containerProperties={
2020-03-12 13:37:46 +00:00
"image": "busybox:latest",
2019-10-31 15:44:26 +00:00
"vcpus": 1,
"memory": 128,
2020-03-12 13:37:46 +00:00
"command": ["echo", "hello"],
2019-10-31 15:44:26 +00:00
},
)
2019-10-31 15:44:26 +00:00
job_def_arn = resp["jobDefinitionArn"]
resp = batch_client.submit_job(
2019-10-31 15:44:26 +00:00
jobName="test1", jobQueue=queue_arn, jobDefinition=job_def_arn
)
2019-10-31 15:44:26 +00:00
job_id = resp["jobId"]
Fix Race Condition in batch:SubmitJob (#3480) * Extract Duplicate Code into Helper Method DRY up the tests and replace the arbitrary `sleep()` calls with a more explicit check before progressing. * Improve Testing of batch:TerminateJob The test now confirms that the job was terminated by sandwiching a `sleep` command between two `echo` commands. In addition to the original checks of the terminated job status/reason, the test now asserts that only the first echo command succeeded, confirming that the job was indeed terminated while in progress. * Fix Race Condition in batch:SubmitJob The `test_submit_job` in `test_batch.py` kicks off a job, calls `describe_jobs` in a loop until the job status returned is SUCCEEDED, and then asserts against the logged events. The backend code that runs the submitted job does so in a separate thread. If the job was successful, the job status was being set to SUCCEEDED *before* the event logs had been written to the logging backend. As a result, it was possible for the primary thread running the test to detect that the job was successful immediately after the secondary thread had updated the job status but before the secondary thread had written the logs to the logging backend. Under the right conditions, this could cause the subsequent logging assertions in the primary thread to fail. Additionally, the code that collected the logs from the container was using a "dodgy hack" of time.sleep() and a modulo-based conditional that was ultimately non-deterministic and could result in log messages being dropped or duplicated in certain scenarios. In order to address these issues, this commit does the following: * Carefully re-orders any code that sets a job status or timestamp to avoid any obvious race conditions. * Removes the "dodgy hack" in favor of a much more straightforward (and less error-prone) method of collecting logs from the container. * Removes arbitrary and unnecessary calls to time.sleep() Before applying any changes, the flaky test was failing about 12% of the time. Putting a sleep() call between setting the `job_status` to SUCCEEDED and collecting the logs, resulted in a 100% failure rate. Simply moving the code that sets the job status to SUCCEEDED to the end of the code block, dropped the failure rate to ~2%. Finally, removing the log collection hack allowed the test suite to run ~1000 times without a single failure. Taken in aggregate, these changes make the batch backend more deterministic and should put the nail in the coffin of this flaky test. Closes #3475
2020-11-18 10:49:25 +00:00
_wait_for_job_status(batch_client, job_id, "SUCCEEDED")
2020-11-11 15:55:37 +00:00
resp = logs_client.describe_log_streams(
logGroupName="/aws/batch/job", logStreamNamePrefix="sayhellotomylittlefriend"
)
2019-10-31 15:44:26 +00:00
len(resp["logStreams"]).should.equal(1)
ls_name = resp["logStreams"][0]["logStreamName"]
2019-10-31 15:44:26 +00:00
resp = logs_client.get_log_events(
logGroupName="/aws/batch/job", logStreamName=ls_name
)
2020-03-12 14:07:34 +00:00
[event["message"] for event in resp["events"]].should.equal(["hello"])
@mock_logs
@mock_ec2
@mock_ecs
@mock_iam
@mock_batch
2020-10-06 06:04:09 +00:00
@pytest.mark.network
def test_list_jobs():
ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients()
vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client)
2019-10-31 15:44:26 +00:00
compute_name = "test_compute_env"
resp = batch_client.create_compute_environment(
computeEnvironmentName=compute_name,
2019-10-31 15:44:26 +00:00
type="UNMANAGED",
state="ENABLED",
serviceRole=iam_arn,
)
2019-10-31 15:44:26 +00:00
arn = resp["computeEnvironmentArn"]
resp = batch_client.create_job_queue(
2019-10-31 15:44:26 +00:00
jobQueueName="test_job_queue",
state="ENABLED",
priority=123,
2019-10-31 15:44:26 +00:00
computeEnvironmentOrder=[{"order": 123, "computeEnvironment": arn}],
)
2019-10-31 15:44:26 +00:00
queue_arn = resp["jobQueueArn"]
resp = batch_client.register_job_definition(
2020-03-12 13:37:46 +00:00
jobDefinitionName="sleep5",
2019-10-31 15:44:26 +00:00
type="container",
containerProperties={
2020-03-12 13:37:46 +00:00
"image": "busybox:latest",
2019-10-31 15:44:26 +00:00
"vcpus": 1,
"memory": 128,
2020-03-12 13:37:46 +00:00
"command": ["sleep", "5"],
2019-10-31 15:44:26 +00:00
},
)
2019-10-31 15:44:26 +00:00
job_def_arn = resp["jobDefinitionArn"]
resp = batch_client.submit_job(
2019-10-31 15:44:26 +00:00
jobName="test1", jobQueue=queue_arn, jobDefinition=job_def_arn
)
2019-10-31 15:44:26 +00:00
job_id1 = resp["jobId"]
resp = batch_client.submit_job(
2019-10-31 15:44:26 +00:00
jobName="test2", jobQueue=queue_arn, jobDefinition=job_def_arn
)
2019-10-31 15:44:26 +00:00
job_id2 = resp["jobId"]
resp_finished_jobs = batch_client.list_jobs(
2019-10-31 15:44:26 +00:00
jobQueue=queue_arn, jobStatus="SUCCEEDED"
)
# Wait only as long as it takes to run the jobs
Fix Race Condition in batch:SubmitJob (#3480) * Extract Duplicate Code into Helper Method DRY up the tests and replace the arbitrary `sleep()` calls with a more explicit check before progressing. * Improve Testing of batch:TerminateJob The test now confirms that the job was terminated by sandwiching a `sleep` command between two `echo` commands. In addition to the original checks of the terminated job status/reason, the test now asserts that only the first echo command succeeded, confirming that the job was indeed terminated while in progress. * Fix Race Condition in batch:SubmitJob The `test_submit_job` in `test_batch.py` kicks off a job, calls `describe_jobs` in a loop until the job status returned is SUCCEEDED, and then asserts against the logged events. The backend code that runs the submitted job does so in a separate thread. If the job was successful, the job status was being set to SUCCEEDED *before* the event logs had been written to the logging backend. As a result, it was possible for the primary thread running the test to detect that the job was successful immediately after the secondary thread had updated the job status but before the secondary thread had written the logs to the logging backend. Under the right conditions, this could cause the subsequent logging assertions in the primary thread to fail. Additionally, the code that collected the logs from the container was using a "dodgy hack" of time.sleep() and a modulo-based conditional that was ultimately non-deterministic and could result in log messages being dropped or duplicated in certain scenarios. In order to address these issues, this commit does the following: * Carefully re-orders any code that sets a job status or timestamp to avoid any obvious race conditions. * Removes the "dodgy hack" in favor of a much more straightforward (and less error-prone) method of collecting logs from the container. * Removes arbitrary and unnecessary calls to time.sleep() Before applying any changes, the flaky test was failing about 12% of the time. Putting a sleep() call between setting the `job_status` to SUCCEEDED and collecting the logs, resulted in a 100% failure rate. Simply moving the code that sets the job status to SUCCEEDED to the end of the code block, dropped the failure rate to ~2%. Finally, removing the log collection hack allowed the test suite to run ~1000 times without a single failure. Taken in aggregate, these changes make the batch backend more deterministic and should put the nail in the coffin of this flaky test. Closes #3475
2020-11-18 10:49:25 +00:00
for job_id in [job_id1, job_id2]:
_wait_for_job_status(batch_client, job_id, "SUCCEEDED")
resp_finished_jobs2 = batch_client.list_jobs(
2019-10-31 15:44:26 +00:00
jobQueue=queue_arn, jobStatus="SUCCEEDED"
)
2019-10-31 15:44:26 +00:00
len(resp_finished_jobs["jobSummaryList"]).should.equal(0)
len(resp_finished_jobs2["jobSummaryList"]).should.equal(2)
@mock_logs
@mock_ec2
@mock_ecs
@mock_iam
@mock_batch
def test_terminate_job():
ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients()
vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client)
2019-10-31 15:44:26 +00:00
compute_name = "test_compute_env"
resp = batch_client.create_compute_environment(
computeEnvironmentName=compute_name,
2019-10-31 15:44:26 +00:00
type="UNMANAGED",
state="ENABLED",
serviceRole=iam_arn,
)
2019-10-31 15:44:26 +00:00
arn = resp["computeEnvironmentArn"]
resp = batch_client.create_job_queue(
2019-10-31 15:44:26 +00:00
jobQueueName="test_job_queue",
state="ENABLED",
priority=123,
2019-10-31 15:44:26 +00:00
computeEnvironmentOrder=[{"order": 123, "computeEnvironment": arn}],
)
2019-10-31 15:44:26 +00:00
queue_arn = resp["jobQueueArn"]
resp = batch_client.register_job_definition(
Fix Race Condition in batch:SubmitJob (#3480) * Extract Duplicate Code into Helper Method DRY up the tests and replace the arbitrary `sleep()` calls with a more explicit check before progressing. * Improve Testing of batch:TerminateJob The test now confirms that the job was terminated by sandwiching a `sleep` command between two `echo` commands. In addition to the original checks of the terminated job status/reason, the test now asserts that only the first echo command succeeded, confirming that the job was indeed terminated while in progress. * Fix Race Condition in batch:SubmitJob The `test_submit_job` in `test_batch.py` kicks off a job, calls `describe_jobs` in a loop until the job status returned is SUCCEEDED, and then asserts against the logged events. The backend code that runs the submitted job does so in a separate thread. If the job was successful, the job status was being set to SUCCEEDED *before* the event logs had been written to the logging backend. As a result, it was possible for the primary thread running the test to detect that the job was successful immediately after the secondary thread had updated the job status but before the secondary thread had written the logs to the logging backend. Under the right conditions, this could cause the subsequent logging assertions in the primary thread to fail. Additionally, the code that collected the logs from the container was using a "dodgy hack" of time.sleep() and a modulo-based conditional that was ultimately non-deterministic and could result in log messages being dropped or duplicated in certain scenarios. In order to address these issues, this commit does the following: * Carefully re-orders any code that sets a job status or timestamp to avoid any obvious race conditions. * Removes the "dodgy hack" in favor of a much more straightforward (and less error-prone) method of collecting logs from the container. * Removes arbitrary and unnecessary calls to time.sleep() Before applying any changes, the flaky test was failing about 12% of the time. Putting a sleep() call between setting the `job_status` to SUCCEEDED and collecting the logs, resulted in a 100% failure rate. Simply moving the code that sets the job status to SUCCEEDED to the end of the code block, dropped the failure rate to ~2%. Finally, removing the log collection hack allowed the test suite to run ~1000 times without a single failure. Taken in aggregate, these changes make the batch backend more deterministic and should put the nail in the coffin of this flaky test. Closes #3475
2020-11-18 10:49:25 +00:00
jobDefinitionName="echo-sleep-echo",
2019-10-31 15:44:26 +00:00
type="container",
containerProperties={
2020-03-12 13:37:46 +00:00
"image": "busybox:latest",
2019-10-31 15:44:26 +00:00
"vcpus": 1,
"memory": 128,
Fix Race Condition in batch:SubmitJob (#3480) * Extract Duplicate Code into Helper Method DRY up the tests and replace the arbitrary `sleep()` calls with a more explicit check before progressing. * Improve Testing of batch:TerminateJob The test now confirms that the job was terminated by sandwiching a `sleep` command between two `echo` commands. In addition to the original checks of the terminated job status/reason, the test now asserts that only the first echo command succeeded, confirming that the job was indeed terminated while in progress. * Fix Race Condition in batch:SubmitJob The `test_submit_job` in `test_batch.py` kicks off a job, calls `describe_jobs` in a loop until the job status returned is SUCCEEDED, and then asserts against the logged events. The backend code that runs the submitted job does so in a separate thread. If the job was successful, the job status was being set to SUCCEEDED *before* the event logs had been written to the logging backend. As a result, it was possible for the primary thread running the test to detect that the job was successful immediately after the secondary thread had updated the job status but before the secondary thread had written the logs to the logging backend. Under the right conditions, this could cause the subsequent logging assertions in the primary thread to fail. Additionally, the code that collected the logs from the container was using a "dodgy hack" of time.sleep() and a modulo-based conditional that was ultimately non-deterministic and could result in log messages being dropped or duplicated in certain scenarios. In order to address these issues, this commit does the following: * Carefully re-orders any code that sets a job status or timestamp to avoid any obvious race conditions. * Removes the "dodgy hack" in favor of a much more straightforward (and less error-prone) method of collecting logs from the container. * Removes arbitrary and unnecessary calls to time.sleep() Before applying any changes, the flaky test was failing about 12% of the time. Putting a sleep() call between setting the `job_status` to SUCCEEDED and collecting the logs, resulted in a 100% failure rate. Simply moving the code that sets the job status to SUCCEEDED to the end of the code block, dropped the failure rate to ~2%. Finally, removing the log collection hack allowed the test suite to run ~1000 times without a single failure. Taken in aggregate, these changes make the batch backend more deterministic and should put the nail in the coffin of this flaky test. Closes #3475
2020-11-18 10:49:25 +00:00
"command": ["sh", "-c", "echo start && sleep 30 && echo stop"],
2019-10-31 15:44:26 +00:00
},
)
2019-10-31 15:44:26 +00:00
job_def_arn = resp["jobDefinitionArn"]
resp = batch_client.submit_job(
2019-10-31 15:44:26 +00:00
jobName="test1", jobQueue=queue_arn, jobDefinition=job_def_arn
)
2019-10-31 15:44:26 +00:00
job_id = resp["jobId"]
Fix Race Condition in batch:SubmitJob (#3480) * Extract Duplicate Code into Helper Method DRY up the tests and replace the arbitrary `sleep()` calls with a more explicit check before progressing. * Improve Testing of batch:TerminateJob The test now confirms that the job was terminated by sandwiching a `sleep` command between two `echo` commands. In addition to the original checks of the terminated job status/reason, the test now asserts that only the first echo command succeeded, confirming that the job was indeed terminated while in progress. * Fix Race Condition in batch:SubmitJob The `test_submit_job` in `test_batch.py` kicks off a job, calls `describe_jobs` in a loop until the job status returned is SUCCEEDED, and then asserts against the logged events. The backend code that runs the submitted job does so in a separate thread. If the job was successful, the job status was being set to SUCCEEDED *before* the event logs had been written to the logging backend. As a result, it was possible for the primary thread running the test to detect that the job was successful immediately after the secondary thread had updated the job status but before the secondary thread had written the logs to the logging backend. Under the right conditions, this could cause the subsequent logging assertions in the primary thread to fail. Additionally, the code that collected the logs from the container was using a "dodgy hack" of time.sleep() and a modulo-based conditional that was ultimately non-deterministic and could result in log messages being dropped or duplicated in certain scenarios. In order to address these issues, this commit does the following: * Carefully re-orders any code that sets a job status or timestamp to avoid any obvious race conditions. * Removes the "dodgy hack" in favor of a much more straightforward (and less error-prone) method of collecting logs from the container. * Removes arbitrary and unnecessary calls to time.sleep() Before applying any changes, the flaky test was failing about 12% of the time. Putting a sleep() call between setting the `job_status` to SUCCEEDED and collecting the logs, resulted in a 100% failure rate. Simply moving the code that sets the job status to SUCCEEDED to the end of the code block, dropped the failure rate to ~2%. Finally, removing the log collection hack allowed the test suite to run ~1000 times without a single failure. Taken in aggregate, these changes make the batch backend more deterministic and should put the nail in the coffin of this flaky test. Closes #3475
2020-11-18 10:49:25 +00:00
_wait_for_job_status(batch_client, job_id, "RUNNING")
2019-10-31 15:44:26 +00:00
batch_client.terminate_job(jobId=job_id, reason="test_terminate")
Fix Race Condition in batch:SubmitJob (#3480) * Extract Duplicate Code into Helper Method DRY up the tests and replace the arbitrary `sleep()` calls with a more explicit check before progressing. * Improve Testing of batch:TerminateJob The test now confirms that the job was terminated by sandwiching a `sleep` command between two `echo` commands. In addition to the original checks of the terminated job status/reason, the test now asserts that only the first echo command succeeded, confirming that the job was indeed terminated while in progress. * Fix Race Condition in batch:SubmitJob The `test_submit_job` in `test_batch.py` kicks off a job, calls `describe_jobs` in a loop until the job status returned is SUCCEEDED, and then asserts against the logged events. The backend code that runs the submitted job does so in a separate thread. If the job was successful, the job status was being set to SUCCEEDED *before* the event logs had been written to the logging backend. As a result, it was possible for the primary thread running the test to detect that the job was successful immediately after the secondary thread had updated the job status but before the secondary thread had written the logs to the logging backend. Under the right conditions, this could cause the subsequent logging assertions in the primary thread to fail. Additionally, the code that collected the logs from the container was using a "dodgy hack" of time.sleep() and a modulo-based conditional that was ultimately non-deterministic and could result in log messages being dropped or duplicated in certain scenarios. In order to address these issues, this commit does the following: * Carefully re-orders any code that sets a job status or timestamp to avoid any obvious race conditions. * Removes the "dodgy hack" in favor of a much more straightforward (and less error-prone) method of collecting logs from the container. * Removes arbitrary and unnecessary calls to time.sleep() Before applying any changes, the flaky test was failing about 12% of the time. Putting a sleep() call between setting the `job_status` to SUCCEEDED and collecting the logs, resulted in a 100% failure rate. Simply moving the code that sets the job status to SUCCEEDED to the end of the code block, dropped the failure rate to ~2%. Finally, removing the log collection hack allowed the test suite to run ~1000 times without a single failure. Taken in aggregate, these changes make the batch backend more deterministic and should put the nail in the coffin of this flaky test. Closes #3475
2020-11-18 10:49:25 +00:00
_wait_for_job_status(batch_client, job_id, "FAILED")
resp = batch_client.describe_jobs(jobs=[job_id])
2019-10-31 15:44:26 +00:00
resp["jobs"][0]["jobName"].should.equal("test1")
resp["jobs"][0]["status"].should.equal("FAILED")
resp["jobs"][0]["statusReason"].should.equal("test_terminate")
Fix Race Condition in batch:SubmitJob (#3480) * Extract Duplicate Code into Helper Method DRY up the tests and replace the arbitrary `sleep()` calls with a more explicit check before progressing. * Improve Testing of batch:TerminateJob The test now confirms that the job was terminated by sandwiching a `sleep` command between two `echo` commands. In addition to the original checks of the terminated job status/reason, the test now asserts that only the first echo command succeeded, confirming that the job was indeed terminated while in progress. * Fix Race Condition in batch:SubmitJob The `test_submit_job` in `test_batch.py` kicks off a job, calls `describe_jobs` in a loop until the job status returned is SUCCEEDED, and then asserts against the logged events. The backend code that runs the submitted job does so in a separate thread. If the job was successful, the job status was being set to SUCCEEDED *before* the event logs had been written to the logging backend. As a result, it was possible for the primary thread running the test to detect that the job was successful immediately after the secondary thread had updated the job status but before the secondary thread had written the logs to the logging backend. Under the right conditions, this could cause the subsequent logging assertions in the primary thread to fail. Additionally, the code that collected the logs from the container was using a "dodgy hack" of time.sleep() and a modulo-based conditional that was ultimately non-deterministic and could result in log messages being dropped or duplicated in certain scenarios. In order to address these issues, this commit does the following: * Carefully re-orders any code that sets a job status or timestamp to avoid any obvious race conditions. * Removes the "dodgy hack" in favor of a much more straightforward (and less error-prone) method of collecting logs from the container. * Removes arbitrary and unnecessary calls to time.sleep() Before applying any changes, the flaky test was failing about 12% of the time. Putting a sleep() call between setting the `job_status` to SUCCEEDED and collecting the logs, resulted in a 100% failure rate. Simply moving the code that sets the job status to SUCCEEDED to the end of the code block, dropped the failure rate to ~2%. Finally, removing the log collection hack allowed the test suite to run ~1000 times without a single failure. Taken in aggregate, these changes make the batch backend more deterministic and should put the nail in the coffin of this flaky test. Closes #3475
2020-11-18 10:49:25 +00:00
resp = logs_client.describe_log_streams(
logGroupName="/aws/batch/job", logStreamNamePrefix="echo-sleep-echo"
)
len(resp["logStreams"]).should.equal(1)
ls_name = resp["logStreams"][0]["logStreamName"]
resp = logs_client.get_log_events(
logGroupName="/aws/batch/job", logStreamName=ls_name
)
# Events should only contain 'start' because we interrupted
# the job before 'stop' was written to the logs.
resp["events"].should.have.length_of(1)
resp["events"][0]["message"].should.equal("start")
def _wait_for_job_status(client, job_id, status, seconds_to_wait=30):
wait_time = datetime.datetime.now() + datetime.timedelta(seconds=seconds_to_wait)
last_job_status = None
while datetime.datetime.now() < wait_time:
resp = client.describe_jobs(jobs=[job_id])
last_job_status = resp["jobs"][0]["status"]
if last_job_status == status:
break
else:
raise RuntimeError(
"Time out waiting for job status {status}!\n Last status: {last_status}".format(
status=status, last_status=last_job_status
)
)