Feat: ECS container status updating (#831)
* Uptick boto3 version to version supporting ECS container instance state changes * Add initial status update * Only place tasks on active instances * PEP8 cleanup
This commit is contained in:
parent
bcc3e57949
commit
7d75c3ba18
@ -61,6 +61,7 @@ class Cluster(BaseObject):
|
|||||||
# ClusterName is optional in CloudFormation, thus create a random name if necessary
|
# ClusterName is optional in CloudFormation, thus create a random name if necessary
|
||||||
cluster_name=properties.get('ClusterName', 'ecscluster{0}'.format(int(random() * 10 ** 6))),
|
cluster_name=properties.get('ClusterName', 'ecscluster{0}'.format(int(random() * 10 ** 6))),
|
||||||
)
|
)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def update_from_cloudformation_json(cls, original_resource, new_resource_name, cloudformation_json, region_name):
|
def update_from_cloudformation_json(cls, original_resource, new_resource_name, cloudformation_json, region_name):
|
||||||
properties = cloudformation_json['Properties']
|
properties = cloudformation_json['Properties']
|
||||||
@ -126,6 +127,7 @@ class TaskDefinition(BaseObject):
|
|||||||
# no-op when nothing changed between old and new resources
|
# no-op when nothing changed between old and new resources
|
||||||
return original_resource
|
return original_resource
|
||||||
|
|
||||||
|
|
||||||
class Task(BaseObject):
|
class Task(BaseObject):
|
||||||
def __init__(self, cluster, task_definition, container_instance_arn, overrides={}, started_by=''):
|
def __init__(self, cluster, task_definition, container_instance_arn, overrides={}, started_by=''):
|
||||||
self.cluster_arn = cluster.arn
|
self.cluster_arn = cluster.arn
|
||||||
@ -227,10 +229,10 @@ class ContainerInstance(BaseObject):
|
|||||||
self.remainingResources = []
|
self.remainingResources = []
|
||||||
self.runningTaskCount = 0
|
self.runningTaskCount = 0
|
||||||
self.versionInfo = {
|
self.versionInfo = {
|
||||||
'agentVersion': "1.0.0",
|
'agentVersion': "1.0.0",
|
||||||
'agentHash': '4023248',
|
'agentHash': '4023248',
|
||||||
'dockerVersion': 'DockerVersion: 1.5.0'
|
'dockerVersion': 'DockerVersion: 1.5.0'
|
||||||
}
|
}
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def response_object(self):
|
def response_object(self):
|
||||||
@ -327,20 +329,6 @@ class EC2ContainerServiceBackend(BaseBackend):
|
|||||||
task_arns.extend([task_definition.arn for task_definition in task_definition_list])
|
task_arns.extend([task_definition.arn for task_definition in task_definition_list])
|
||||||
return task_arns
|
return task_arns
|
||||||
|
|
||||||
def describe_task_definition(self, task_definition_str):
|
|
||||||
task_definition_name = task_definition_str.split('/')[-1]
|
|
||||||
if ':' in task_definition_name:
|
|
||||||
family, revision = task_definition_name.split(':')
|
|
||||||
revision = int(revision)
|
|
||||||
else:
|
|
||||||
family = task_definition_name
|
|
||||||
revision = len(self.task_definitions.get(family, []))
|
|
||||||
|
|
||||||
if family in self.task_definitions and 0 < revision <= len(self.task_definitions[family]):
|
|
||||||
return self.task_definitions[family][revision-1]
|
|
||||||
else:
|
|
||||||
raise Exception("{0} is not a task_definition".format(task_definition_name))
|
|
||||||
|
|
||||||
def deregister_task_definition(self, task_definition_str):
|
def deregister_task_definition(self, task_definition_str):
|
||||||
task_definition_name = task_definition_str.split('/')[-1]
|
task_definition_name = task_definition_str.split('/')[-1]
|
||||||
family, revision = task_definition_name.split(':')
|
family, revision = task_definition_name.split(':')
|
||||||
@ -363,9 +351,11 @@ class EC2ContainerServiceBackend(BaseBackend):
|
|||||||
container_instances = list(self.container_instances.get(cluster_name, {}).keys())
|
container_instances = list(self.container_instances.get(cluster_name, {}).keys())
|
||||||
if not container_instances:
|
if not container_instances:
|
||||||
raise Exception("No instances found in cluster {}".format(cluster_name))
|
raise Exception("No instances found in cluster {}".format(cluster_name))
|
||||||
|
active_container_instances = [x for x in container_instances if
|
||||||
|
self.container_instances[cluster_name][x].status == 'ACTIVE']
|
||||||
for _ in range(count or 1):
|
for _ in range(count or 1):
|
||||||
container_instance_arn = self.container_instances[cluster_name][
|
container_instance_arn = self.container_instances[cluster_name][
|
||||||
container_instances[randint(0, len(container_instances) - 1)]
|
active_container_instances[randint(0, len(active_container_instances) - 1)]
|
||||||
].containerInstanceArn
|
].containerInstanceArn
|
||||||
task = Task(cluster, task_definition, container_instance_arn, overrides or {}, started_by or '')
|
task = Task(cluster, task_definition, container_instance_arn, overrides or {}, started_by or '')
|
||||||
tasks.append(task)
|
tasks.append(task)
|
||||||
@ -537,6 +527,25 @@ class EC2ContainerServiceBackend(BaseBackend):
|
|||||||
|
|
||||||
return container_instance_objects, failures
|
return container_instance_objects, failures
|
||||||
|
|
||||||
|
def update_container_instances_state(self, cluster_str, list_container_instance_ids, status):
|
||||||
|
cluster_name = cluster_str.split('/')[-1]
|
||||||
|
if cluster_name not in self.clusters:
|
||||||
|
raise Exception("{0} is not a cluster".format(cluster_name))
|
||||||
|
status = status.upper()
|
||||||
|
if status not in ['ACTIVE', 'DRAINING']:
|
||||||
|
raise Exception("An error occurred (InvalidParameterException) when calling the UpdateContainerInstancesState operation: Container instances status should be one of [ACTIVE,DRAINING]")
|
||||||
|
failures = []
|
||||||
|
container_instance_objects = []
|
||||||
|
for container_instance_id in list_container_instance_ids:
|
||||||
|
container_instance = self.container_instances[cluster_name].get(container_instance_id, None)
|
||||||
|
if container_instance is not None:
|
||||||
|
container_instance.status = status
|
||||||
|
container_instance_objects.append(container_instance)
|
||||||
|
else:
|
||||||
|
failures.append(ContainerInstanceFailure('MISSING', container_instance_id))
|
||||||
|
|
||||||
|
return container_instance_objects, failures
|
||||||
|
|
||||||
def deregister_container_instance(self, cluster_str, container_instance_str):
|
def deregister_container_instance(self, cluster_str, container_instance_str):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
@ -1,6 +1,5 @@
|
|||||||
from __future__ import unicode_literals
|
from __future__ import unicode_literals
|
||||||
import json
|
import json
|
||||||
import uuid
|
|
||||||
|
|
||||||
from moto.core.responses import BaseResponse
|
from moto.core.responses import BaseResponse
|
||||||
from .models import ecs_backends
|
from .models import ecs_backends
|
||||||
@ -34,8 +33,8 @@ class EC2ContainerServiceResponse(BaseResponse):
|
|||||||
cluster_arns = self.ecs_backend.list_clusters()
|
cluster_arns = self.ecs_backend.list_clusters()
|
||||||
return json.dumps({
|
return json.dumps({
|
||||||
'clusterArns': cluster_arns
|
'clusterArns': cluster_arns
|
||||||
#,
|
# ,
|
||||||
#'nextToken': str(uuid.uuid1())
|
# 'nextToken': str(uuid.uuid1())
|
||||||
})
|
})
|
||||||
|
|
||||||
def describe_clusters(self):
|
def describe_clusters(self):
|
||||||
@ -66,15 +65,8 @@ class EC2ContainerServiceResponse(BaseResponse):
|
|||||||
task_definition_arns = self.ecs_backend.list_task_definitions()
|
task_definition_arns = self.ecs_backend.list_task_definitions()
|
||||||
return json.dumps({
|
return json.dumps({
|
||||||
'taskDefinitionArns': task_definition_arns
|
'taskDefinitionArns': task_definition_arns
|
||||||
#,
|
# ,
|
||||||
#'nextToken': str(uuid.uuid1())
|
# 'nextToken': str(uuid.uuid1())
|
||||||
})
|
|
||||||
|
|
||||||
def describe_task_definition(self):
|
|
||||||
task_definition_str = self._get_param('taskDefinition')
|
|
||||||
task_definition = self.ecs_backend.describe_task_definition(task_definition_str)
|
|
||||||
return json.dumps({
|
|
||||||
'taskDefinition': task_definition.response_object
|
|
||||||
})
|
})
|
||||||
|
|
||||||
def deregister_task_definition(self):
|
def deregister_task_definition(self):
|
||||||
@ -94,7 +86,7 @@ class EC2ContainerServiceResponse(BaseResponse):
|
|||||||
return json.dumps({
|
return json.dumps({
|
||||||
'tasks': [task.response_object for task in tasks],
|
'tasks': [task.response_object for task in tasks],
|
||||||
'failures': []
|
'failures': []
|
||||||
})
|
})
|
||||||
|
|
||||||
def describe_tasks(self):
|
def describe_tasks(self):
|
||||||
cluster = self._get_param('cluster')
|
cluster = self._get_param('cluster')
|
||||||
@ -123,7 +115,7 @@ class EC2ContainerServiceResponse(BaseResponse):
|
|||||||
return json.dumps({
|
return json.dumps({
|
||||||
'tasks': [task.response_object for task in tasks],
|
'tasks': [task.response_object for task in tasks],
|
||||||
'failures': []
|
'failures': []
|
||||||
})
|
})
|
||||||
|
|
||||||
def list_tasks(self):
|
def list_tasks(self):
|
||||||
cluster_str = self._get_param('cluster')
|
cluster_str = self._get_param('cluster')
|
||||||
@ -135,8 +127,7 @@ class EC2ContainerServiceResponse(BaseResponse):
|
|||||||
task_arns = self.ecs_backend.list_tasks(cluster_str, container_instance, family, started_by, service_name, desiredStatus)
|
task_arns = self.ecs_backend.list_tasks(cluster_str, container_instance, family, started_by, service_name, desiredStatus)
|
||||||
return json.dumps({
|
return json.dumps({
|
||||||
'taskArns': task_arns
|
'taskArns': task_arns
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
||||||
def stop_task(self):
|
def stop_task(self):
|
||||||
cluster_str = self._get_param('cluster')
|
cluster_str = self._get_param('cluster')
|
||||||
@ -145,8 +136,7 @@ class EC2ContainerServiceResponse(BaseResponse):
|
|||||||
task = self.ecs_backend.stop_task(cluster_str, task, reason)
|
task = self.ecs_backend.stop_task(cluster_str, task, reason)
|
||||||
return json.dumps({
|
return json.dumps({
|
||||||
'task': task.response_object
|
'task': task.response_object
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
||||||
def create_service(self):
|
def create_service(self):
|
||||||
cluster_str = self._get_param('cluster')
|
cluster_str = self._get_param('cluster')
|
||||||
@ -201,7 +191,7 @@ class EC2ContainerServiceResponse(BaseResponse):
|
|||||||
ec2_instance_id = instance_identity_document["instanceId"]
|
ec2_instance_id = instance_identity_document["instanceId"]
|
||||||
container_instance = self.ecs_backend.register_container_instance(cluster_str, ec2_instance_id)
|
container_instance = self.ecs_backend.register_container_instance(cluster_str, ec2_instance_id)
|
||||||
return json.dumps({
|
return json.dumps({
|
||||||
'containerInstance' : container_instance.response_object
|
'containerInstance': container_instance.response_object
|
||||||
})
|
})
|
||||||
|
|
||||||
def list_container_instances(self):
|
def list_container_instances(self):
|
||||||
@ -216,6 +206,16 @@ class EC2ContainerServiceResponse(BaseResponse):
|
|||||||
list_container_instance_arns = self._get_param('containerInstances')
|
list_container_instance_arns = self._get_param('containerInstances')
|
||||||
container_instances, failures = self.ecs_backend.describe_container_instances(cluster_str, list_container_instance_arns)
|
container_instances, failures = self.ecs_backend.describe_container_instances(cluster_str, list_container_instance_arns)
|
||||||
return json.dumps({
|
return json.dumps({
|
||||||
'failures': [ci.response_object for ci in failures],
|
'failures': [ci.response_object for ci in failures],
|
||||||
'containerInstances': [ci.response_object for ci in container_instances]
|
'containerInstances': [ci.response_object for ci in container_instances]
|
||||||
|
})
|
||||||
|
|
||||||
|
def update_container_instances_state(self):
|
||||||
|
cluster_str = self._get_param('cluster')
|
||||||
|
list_container_instance_arns = self._get_param('containerInstances')
|
||||||
|
status_str = self._get_param('status')
|
||||||
|
container_instances, failures = self.ecs_backend.update_container_instances_state(cluster_str, list_container_instance_arns, status_str)
|
||||||
|
return json.dumps({
|
||||||
|
'failures': [ci.response_object for ci in failures],
|
||||||
|
'containerInstances': [ci.response_object for ci in container_instances]
|
||||||
})
|
})
|
||||||
|
@ -5,6 +5,6 @@ sure==1.2.24
|
|||||||
coverage
|
coverage
|
||||||
freezegun
|
freezegun
|
||||||
flask
|
flask
|
||||||
boto3>=1.3.1
|
boto3>=1.4.4
|
||||||
botocore>=1.4.28
|
botocore>=1.4.28
|
||||||
six
|
six
|
||||||
|
@ -573,6 +573,58 @@ def test_describe_container_instances():
|
|||||||
for arn in test_instance_arns:
|
for arn in test_instance_arns:
|
||||||
response_arns.should.contain(arn)
|
response_arns.should.contain(arn)
|
||||||
|
|
||||||
|
@mock_ec2
|
||||||
|
@mock_ecs
|
||||||
|
def test_update_container_instances_state():
|
||||||
|
ecs_client = boto3.client('ecs', region_name='us-east-1')
|
||||||
|
ec2 = boto3.resource('ec2', region_name='us-east-1')
|
||||||
|
|
||||||
|
test_cluster_name = 'test_ecs_cluster'
|
||||||
|
_ = ecs_client.create_cluster(
|
||||||
|
clusterName=test_cluster_name
|
||||||
|
)
|
||||||
|
|
||||||
|
instance_to_create = 3
|
||||||
|
test_instance_arns = []
|
||||||
|
for i in range(0, instance_to_create):
|
||||||
|
test_instance = ec2.create_instances(
|
||||||
|
ImageId="ami-1234abcd",
|
||||||
|
MinCount=1,
|
||||||
|
MaxCount=1,
|
||||||
|
)[0]
|
||||||
|
|
||||||
|
instance_id_document = json.dumps(
|
||||||
|
ec2_utils.generate_instance_identity_document(test_instance)
|
||||||
|
)
|
||||||
|
|
||||||
|
response = ecs_client.register_container_instance(
|
||||||
|
cluster=test_cluster_name,
|
||||||
|
instanceIdentityDocument=instance_id_document)
|
||||||
|
|
||||||
|
test_instance_arns.append(response['containerInstance']['containerInstanceArn'])
|
||||||
|
|
||||||
|
test_instance_ids = list(map((lambda x: x.split('/')[1]), test_instance_arns))
|
||||||
|
response = ecs_client.update_container_instances_state(cluster=test_cluster_name, containerInstances=test_instance_ids, status='DRAINING')
|
||||||
|
len(response['failures']).should.equal(0)
|
||||||
|
len(response['containerInstances']).should.equal(instance_to_create)
|
||||||
|
response_statuses = [ci['status'] for ci in response['containerInstances']]
|
||||||
|
for status in response_statuses:
|
||||||
|
status.should.equal('DRAINING')
|
||||||
|
response = ecs_client.update_container_instances_state(cluster=test_cluster_name, containerInstances=test_instance_ids, status='DRAINING')
|
||||||
|
len(response['failures']).should.equal(0)
|
||||||
|
len(response['containerInstances']).should.equal(instance_to_create)
|
||||||
|
response_statuses = [ci['status'] for ci in response['containerInstances']]
|
||||||
|
for status in response_statuses:
|
||||||
|
status.should.equal('DRAINING')
|
||||||
|
response = ecs_client.update_container_instances_state(cluster=test_cluster_name, containerInstances=test_instance_ids, status='ACTIVE')
|
||||||
|
len(response['failures']).should.equal(0)
|
||||||
|
len(response['containerInstances']).should.equal(instance_to_create)
|
||||||
|
response_statuses = [ci['status'] for ci in response['containerInstances']]
|
||||||
|
for status in response_statuses:
|
||||||
|
status.should.equal('ACTIVE')
|
||||||
|
ecs_client.update_container_instances_state.when.called_with(cluster=test_cluster_name, containerInstances=test_instance_ids, status='test_status').should.throw(Exception)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@mock_ec2
|
@mock_ec2
|
||||||
@mock_ecs
|
@mock_ecs
|
||||||
@ -861,6 +913,7 @@ def describe_task_definition():
|
|||||||
task['taskDefinitionArn'].should.equal('arn:aws:ecs:us-east-1:012345678910:task-definition/test_ecs_task2:1')
|
task['taskDefinitionArn'].should.equal('arn:aws:ecs:us-east-1:012345678910:task-definition/test_ecs_task2:1')
|
||||||
task['volumes'].should.equal([])
|
task['volumes'].should.equal([])
|
||||||
|
|
||||||
|
|
||||||
@mock_ec2
|
@mock_ec2
|
||||||
@mock_ecs
|
@mock_ecs
|
||||||
def test_stop_task():
|
def test_stop_task():
|
||||||
|
Loading…
Reference in New Issue
Block a user