Merge pull request #687 from skygeo/ecs_tasks

Add support for ECS tasks
This commit is contained in:
Steve Pulec 2016-09-13 21:51:42 -04:00 committed by GitHub
commit 65047ce102
3 changed files with 505 additions and 1 deletions

View File

@ -1,8 +1,10 @@
from __future__ import unicode_literals
import uuid
from random import randint
from moto.core import BaseBackend
from moto.ec2 import ec2_backends
from copy import copy
class BaseObject(object):
@ -16,7 +18,7 @@ class BaseObject(object):
return ''.join(words)
def gen_response_object(self):
response_object = self.__dict__.copy()
response_object = copy(self.__dict__)
for key, value in response_object.items():
if '_' in key:
response_object[self.camelCase(key)] = value
@ -63,6 +65,25 @@ class TaskDefinition(BaseObject):
return response_object
class Task(BaseObject):
def __init__(self, cluster, task_definition, container_instance_arn, overrides={}, started_by=''):
self.cluster_arn = cluster.arn
self.task_arn = 'arn:aws:ecs:us-east-1:012345678910:task/{0}'.format(str(uuid.uuid1()))
self.container_instance_arn = container_instance_arn
self.last_status = 'RUNNING'
self.desired_status = 'RUNNING'
self.task_definition_arn = task_definition.arn
self.overrides = overrides
self.containers = []
self.started_by = started_by
self.stopped_reason = ''
@property
def response_object(self):
response_object = self.gen_response_object()
return response_object
class Service(BaseObject):
def __init__(self, cluster, service_name, task_definition, desired_count):
self.cluster_arn = cluster.arn
@ -125,6 +146,7 @@ class EC2ContainerServiceBackend(BaseBackend):
def __init__(self):
self.clusters = {}
self.task_definitions = {}
self.tasks = {}
self.services = {}
self.container_instances = {}
@ -204,6 +226,106 @@ class EC2ContainerServiceBackend(BaseBackend):
else:
raise Exception("{0} is not a task_definition".format(task_definition_name))
def run_task(self, cluster_str, task_definition_str, count, overrides, started_by):
cluster_name = cluster_str.split('/')[-1]
if cluster_name in self.clusters:
cluster = self.clusters[cluster_name]
else:
raise Exception("{0} is not a cluster".format(cluster_name))
task_definition = self.fetch_task_definition(task_definition_str)
if cluster_name not in self.tasks:
self.tasks[cluster_name] = {}
tasks = []
container_instances = list(self.container_instances.get(cluster_name, {}).keys())
if not container_instances:
raise Exception("No instances found in cluster {}".format(cluster_name))
for _ in range(count or 1):
container_instance_arn = self.container_instances[cluster_name][
container_instances[randint(0, len(container_instances) - 1)]
].containerInstanceArn
task = Task(cluster, task_definition, container_instance_arn, overrides or {}, started_by or '')
tasks.append(task)
self.tasks[cluster_name][task.task_arn] = task
return tasks
def start_task(self, cluster_str, task_definition_str, container_instances, overrides, started_by):
cluster_name = cluster_str.split('/')[-1]
if cluster_name in self.clusters:
cluster = self.clusters[cluster_name]
else:
raise Exception("{0} is not a cluster".format(cluster_name))
task_definition = self.fetch_task_definition(task_definition_str)
if cluster_name not in self.tasks:
self.tasks[cluster_name] = {}
tasks = []
if not container_instances:
raise Exception("No container instance list provided")
container_instance_ids = [x.split('/')[-1] for x in container_instances]
for container_instance_id in container_instance_ids:
container_instance_arn = self.container_instances[cluster_name][
container_instance_id
].containerInstanceArn
task = Task(cluster, task_definition, container_instance_arn, overrides or {}, started_by or '')
tasks.append(task)
self.tasks[cluster_name][task.task_arn] = task
return tasks
def describe_tasks(self, cluster_str, tasks):
cluster_name = cluster_str.split('/')[-1]
if cluster_name in self.clusters:
cluster = self.clusters[cluster_name]
else:
raise Exception("{0} is not a cluster".format(cluster_name))
if not tasks:
raise Exception("tasks cannot be empty")
response = []
for cluster, cluster_tasks in self.tasks.items():
for task_id, task in cluster_tasks.items():
if task_id in tasks or task.task_arn in tasks:
response.append(task)
return response
def list_tasks(self, cluster_str, container_instance, family, started_by, service_name, desiredStatus):
filtered_tasks = []
for cluster, tasks in self.tasks.items():
for arn, task in tasks.items():
filtered_tasks.append(task)
if cluster_str:
cluster_name = cluster_str.split('/')[-1]
if cluster_name in self.clusters:
cluster = self.clusters[cluster_name]
else:
raise Exception("{0} is not a cluster".format(cluster_name))
filtered_tasks = list(filter(lambda t: cluster_name in t.cluster_arn, filtered_tasks))
if container_instance:
filtered_tasks = list(filter(lambda t: container_instance in t.container_instance_arn, filtered_tasks))
if started_by:
filtered_tasks = list(filter(lambda t: started_by == t.started_by, filtered_tasks))
return [t.task_arn for t in filtered_tasks]
def stop_task(self, cluster_str, task_str, reason):
cluster_name = cluster_str.split('/')[-1]
if cluster_name not in self.clusters:
raise Exception("{0} is not a cluster".format(cluster_name))
if not task_str:
raise Exception("A task ID or ARN is required")
task_id = task_str.split('/')[-1]
tasks = self.tasks.get(cluster_name, None)
if not tasks:
raise Exception("Cluster {} has no registered tasks".format(cluster_name))
for task in tasks.keys():
if task.endswith(task_id):
tasks[task].last_status = 'STOPPED'
tasks[task].desired_status = 'STOPPED'
tasks[task].stopped_reason = reason
return tasks[task]
raise Exception("Could not find task {} on cluster {}".format(task_str, cluster_name))
def create_service(self, cluster_str, service_name, task_definition_str, desired_count):
cluster_name = cluster_str.split('/')[-1]
if cluster_name in self.clusters:

View File

@ -77,6 +77,62 @@ class EC2ContainerServiceResponse(BaseResponse):
'taskDefinition': task_definition.response_object
})
def run_task(self):
cluster_str = self._get_param('cluster')
overrides = self._get_param('overrides')
task_definition_str = self._get_param('taskDefinition')
count = self._get_int_param('count')
started_by = self._get_param('startedBy')
tasks = self.ecs_backend.run_task(cluster_str, task_definition_str, count, overrides, started_by)
return json.dumps({
'tasks': [task.response_object for task in tasks],
'failures': []
})
def describe_tasks(self):
cluster = self._get_param('cluster')
tasks = self._get_param('tasks')
data = self.ecs_backend.describe_tasks(cluster, tasks)
return json.dumps({
'tasks': [task.response_object for task in data],
'failures': []
})
def start_task(self):
cluster_str = self._get_param('cluster')
overrides = self._get_param('overrides')
task_definition_str = self._get_param('taskDefinition')
container_instances = self._get_param('containerInstances')
started_by = self._get_param('startedBy')
tasks = self.ecs_backend.start_task(cluster_str, task_definition_str, container_instances, overrides, started_by)
return json.dumps({
'tasks': [task.response_object for task in tasks],
'failures': []
})
def list_tasks(self):
cluster_str = self._get_param('cluster')
container_instance = self._get_param('containerInstance')
family = self._get_param('family')
started_by = self._get_param('startedBy')
service_name = self._get_param('serviceName')
desiredStatus = self._get_param('desiredStatus')
task_arns = self.ecs_backend.list_tasks(cluster_str, container_instance, family, started_by, service_name, desiredStatus)
return json.dumps({
'taskArns': task_arns
})
def stop_task(self):
cluster_str = self._get_param('cluster')
task = self._get_param('task')
reason = self._get_param('reason')
task = self.ecs_backend.stop_task(cluster_str, task, reason)
return json.dumps({
'task': task.response_object
})
def create_service(self):
cluster_str = self._get_param('cluster')
service_name = self._get_param('serviceName')

View File

@ -379,6 +379,7 @@ def test_register_container_instance():
response['containerInstance']['versionInfo']['agentHash'].should.equal('4023248')
response['containerInstance']['versionInfo']['dockerVersion'].should.equal('DockerVersion: 1.5.0')
@mock_ec2
@mock_ecs
def test_list_container_instances():
@ -453,3 +454,328 @@ def test_describe_container_instances():
response_arns = [ci['containerInstanceArn'] for ci in response['containerInstances']]
for arn in test_instance_arns:
response_arns.should.contain(arn)
@mock_ec2
@mock_ecs
def test_run_task():
client = boto3.client('ecs', region_name='us-east-1')
ec2 = boto3.resource('ec2', region_name='us-east-1')
test_cluster_name = 'test_ecs_cluster'
_ = client.create_cluster(
clusterName=test_cluster_name
)
test_instance = ec2.create_instances(
ImageId="ami-1234abcd",
MinCount=1,
MaxCount=1,
)[0]
instance_id_document = json.dumps(
ec2_utils.generate_instance_identity_document(test_instance)
)
response = client.register_container_instance(
cluster=test_cluster_name,
instanceIdentityDocument=instance_id_document
)
_ = client.register_task_definition(
family='test_ecs_task',
containerDefinitions=[
{
'name': 'hello_world',
'image': 'docker/hello-world:latest',
'cpu': 1024,
'memory': 400,
'essential': True,
'environment': [{
'name': 'AWS_ACCESS_KEY_ID',
'value': 'SOME_ACCESS_KEY'
}],
'logConfiguration': {'logDriver': 'json-file'}
}
]
)
response = client.run_task(
cluster='test_ecs_cluster',
overrides={},
taskDefinition='test_ecs_task',
count=2,
startedBy='moto'
)
len(response['tasks']).should.equal(2)
response['tasks'][0]['taskArn'].should.contain('arn:aws:ecs:us-east-1:012345678910:task/')
response['tasks'][0]['clusterArn'].should.equal('arn:aws:ecs:us-east-1:012345678910:cluster/test_ecs_cluster')
response['tasks'][0]['taskDefinitionArn'].should.equal('arn:aws:ecs:us-east-1:012345678910:task-definition/test_ecs_task:1')
response['tasks'][0]['containerInstanceArn'].should.contain('arn:aws:ecs:us-east-1:012345678910:container-instance/')
response['tasks'][0]['overrides'].should.equal({})
response['tasks'][0]['lastStatus'].should.equal("RUNNING")
response['tasks'][0]['desiredStatus'].should.equal("RUNNING")
response['tasks'][0]['startedBy'].should.equal("moto")
response['tasks'][0]['stoppedReason'].should.equal("")
@mock_ec2
@mock_ecs
def test_start_task():
client = boto3.client('ecs', region_name='us-east-1')
ec2 = boto3.resource('ec2', region_name='us-east-1')
test_cluster_name = 'test_ecs_cluster'
_ = client.create_cluster(
clusterName=test_cluster_name
)
test_instance = ec2.create_instances(
ImageId="ami-1234abcd",
MinCount=1,
MaxCount=1,
)[0]
instance_id_document = json.dumps(
ec2_utils.generate_instance_identity_document(test_instance)
)
response = client.register_container_instance(
cluster=test_cluster_name,
instanceIdentityDocument=instance_id_document
)
container_instances = client.list_container_instances(cluster=test_cluster_name)
container_instance_id = container_instances['containerInstanceArns'][0].split('/')[-1]
_ = client.register_task_definition(
family='test_ecs_task',
containerDefinitions=[
{
'name': 'hello_world',
'image': 'docker/hello-world:latest',
'cpu': 1024,
'memory': 400,
'essential': True,
'environment': [{
'name': 'AWS_ACCESS_KEY_ID',
'value': 'SOME_ACCESS_KEY'
}],
'logConfiguration': {'logDriver': 'json-file'}
}
]
)
response = client.start_task(
cluster='test_ecs_cluster',
taskDefinition='test_ecs_task',
overrides={},
containerInstances=[container_instance_id],
startedBy='moto'
)
len(response['tasks']).should.equal(1)
response['tasks'][0]['taskArn'].should.contain('arn:aws:ecs:us-east-1:012345678910:task/')
response['tasks'][0]['clusterArn'].should.equal('arn:aws:ecs:us-east-1:012345678910:cluster/test_ecs_cluster')
response['tasks'][0]['taskDefinitionArn'].should.equal('arn:aws:ecs:us-east-1:012345678910:task-definition/test_ecs_task:1')
response['tasks'][0]['containerInstanceArn'].should.equal('arn:aws:ecs:us-east-1:012345678910:container-instance/{0}'.format(container_instance_id))
response['tasks'][0]['overrides'].should.equal({})
response['tasks'][0]['lastStatus'].should.equal("RUNNING")
response['tasks'][0]['desiredStatus'].should.equal("RUNNING")
response['tasks'][0]['startedBy'].should.equal("moto")
response['tasks'][0]['stoppedReason'].should.equal("")
@mock_ec2
@mock_ecs
def test_list_tasks():
client = boto3.client('ecs', region_name='us-east-1')
ec2 = boto3.resource('ec2', region_name='us-east-1')
test_cluster_name = 'test_ecs_cluster'
_ = client.create_cluster(
clusterName=test_cluster_name
)
test_instance = ec2.create_instances(
ImageId="ami-1234abcd",
MinCount=1,
MaxCount=1,
)[0]
instance_id_document = json.dumps(
ec2_utils.generate_instance_identity_document(test_instance)
)
response = client.register_container_instance(
cluster=test_cluster_name,
instanceIdentityDocument=instance_id_document
)
container_instances = client.list_container_instances(cluster=test_cluster_name)
container_instance_id = container_instances['containerInstanceArns'][0].split('/')[-1]
_ = client.register_task_definition(
family='test_ecs_task',
containerDefinitions=[
{
'name': 'hello_world',
'image': 'docker/hello-world:latest',
'cpu': 1024,
'memory': 400,
'essential': True,
'environment': [{
'name': 'AWS_ACCESS_KEY_ID',
'value': 'SOME_ACCESS_KEY'
}],
'logConfiguration': {'logDriver': 'json-file'}
}
]
)
_ = client.start_task(
cluster='test_ecs_cluster',
taskDefinition='test_ecs_task',
overrides={},
containerInstances=[container_instance_id],
startedBy='foo'
)
_ = client.start_task(
cluster='test_ecs_cluster',
taskDefinition='test_ecs_task',
overrides={},
containerInstances=[container_instance_id],
startedBy='bar'
)
assert len(client.list_tasks()['taskArns']).should.equal(2)
assert len(client.list_tasks(cluster='test_ecs_cluster')['taskArns']).should.equal(2)
assert len(client.list_tasks(startedBy='foo')['taskArns']).should.equal(1)
@mock_ec2
@mock_ecs
def test_describe_tasks():
client = boto3.client('ecs', region_name='us-east-1')
ec2 = boto3.resource('ec2', region_name='us-east-1')
test_cluster_name = 'test_ecs_cluster'
_ = client.create_cluster(
clusterName=test_cluster_name
)
test_instance = ec2.create_instances(
ImageId="ami-1234abcd",
MinCount=1,
MaxCount=1,
)[0]
instance_id_document = json.dumps(
ec2_utils.generate_instance_identity_document(test_instance)
)
response = client.register_container_instance(
cluster=test_cluster_name,
instanceIdentityDocument=instance_id_document
)
_ = client.register_task_definition(
family='test_ecs_task',
containerDefinitions=[
{
'name': 'hello_world',
'image': 'docker/hello-world:latest',
'cpu': 1024,
'memory': 400,
'essential': True,
'environment': [{
'name': 'AWS_ACCESS_KEY_ID',
'value': 'SOME_ACCESS_KEY'
}],
'logConfiguration': {'logDriver': 'json-file'}
}
]
)
tasks_arns = [
task['taskArn'] for task in client.run_task(
cluster='test_ecs_cluster',
overrides={},
taskDefinition='test_ecs_task',
count=2,
startedBy='moto'
)['tasks']
]
response = client.describe_tasks(
cluster='test_ecs_cluster',
tasks=tasks_arns
)
len(response['tasks']).should.equal(2)
set([response['tasks'][0]['taskArn'], response['tasks'][1]['taskArn']]).should.equal(set(tasks_arns))
@mock_ec2
@mock_ecs
def test_stop_task():
client = boto3.client('ecs', region_name='us-east-1')
ec2 = boto3.resource('ec2', region_name='us-east-1')
test_cluster_name = 'test_ecs_cluster'
_ = client.create_cluster(
clusterName=test_cluster_name
)
test_instance = ec2.create_instances(
ImageId="ami-1234abcd",
MinCount=1,
MaxCount=1,
)[0]
instance_id_document = json.dumps(
ec2_utils.generate_instance_identity_document(test_instance)
)
_ = client.register_container_instance(
cluster=test_cluster_name,
instanceIdentityDocument=instance_id_document
)
_ = client.register_task_definition(
family='test_ecs_task',
containerDefinitions=[
{
'name': 'hello_world',
'image': 'docker/hello-world:latest',
'cpu': 1024,
'memory': 400,
'essential': True,
'environment': [{
'name': 'AWS_ACCESS_KEY_ID',
'value': 'SOME_ACCESS_KEY'
}],
'logConfiguration': {'logDriver': 'json-file'}
}
]
)
run_response = client.run_task(
cluster='test_ecs_cluster',
overrides={},
taskDefinition='test_ecs_task',
count=1,
startedBy='moto'
)
stop_response = client.stop_task(
cluster='test_ecs_cluster',
task=run_response['tasks'][0].get('taskArn'),
reason='moto testing'
)
stop_response['task']['taskArn'].should.equal(run_response['tasks'][0].get('taskArn'))
stop_response['task']['lastStatus'].should.equal('STOPPED')
stop_response['task']['desiredStatus'].should.equal('STOPPED')
stop_response['task']['stoppedReason'].should.equal('moto testing')