diff --git a/moto/ecs/models.py b/moto/ecs/models.py index e5a2e9f96..aadc76bce 100644 --- a/moto/ecs/models.py +++ b/moto/ecs/models.py @@ -1,6 +1,6 @@ from __future__ import unicode_literals import uuid -from random import randint, random +from random import random from moto.core import BaseBackend, BaseModel from moto.ec2 import ec2_backends @@ -130,7 +130,6 @@ class TaskDefinition(BaseObject): original_resource.volumes != volumes): # currently TaskRoleArn isn't stored at TaskDefinition # instances - ecs_backend = ecs_backends[region_name] ecs_backend.deregister_task_definition(original_resource.arn) return ecs_backend.register_task_definition( @@ -142,7 +141,8 @@ class TaskDefinition(BaseObject): class Task(BaseObject): - def __init__(self, cluster, task_definition, container_instance_arn, overrides={}, started_by=''): + def __init__(self, cluster, task_definition, container_instance_arn, + resource_requirements, overrides={}, started_by=''): self.cluster_arn = cluster.arn self.task_arn = 'arn:aws:ecs:us-east-1:012345678910:task/{0}'.format( str(uuid.uuid1())) @@ -154,6 +154,7 @@ class Task(BaseObject): self.containers = [] self.started_by = started_by self.stopped_reason = '' + self.resource_requirements = resource_requirements @property def response_object(self): @@ -240,12 +241,57 @@ class ContainerInstance(BaseObject): def __init__(self, ec2_instance_id): self.ec2_instance_id = ec2_instance_id self.status = 'ACTIVE' - self.registeredResources = [] + self.registeredResources = [ + {'doubleValue': 0.0, + 'integerValue': 4096, + 'longValue': 0, + 'name': 'CPU', + 'type': 'INTEGER'}, + {'doubleValue': 0.0, + 'integerValue': 7482, + 'longValue': 0, + 'name': 'MEMORY', + 'type': 'INTEGER'}, + {'doubleValue': 0.0, + 'integerValue': 0, + 'longValue': 0, + 'name': 'PORTS', + 'stringSetValue': ['22', '2376', '2375', '51678', '51679'], + 'type': 'STRINGSET'}, + {'doubleValue': 0.0, + 'integerValue': 0, + 'longValue': 0, + 'name': 'PORTS_UDP', + 'stringSetValue': [], + 'type': 'STRINGSET'}] self.agentConnected = True self.containerInstanceArn = "arn:aws:ecs:us-east-1:012345678910:container-instance/{0}".format( str(uuid.uuid1())) self.pendingTaskCount = 0 - self.remainingResources = [] + self.remainingResources = [ + {'doubleValue': 0.0, + 'integerValue': 4096, + 'longValue': 0, + 'name': 'CPU', + 'type': 'INTEGER'}, + {'doubleValue': 0.0, + 'integerValue': 7482, + 'longValue': 0, + 'name': 'MEMORY', + 'type': 'INTEGER'}, + {'doubleValue': 0.0, + 'integerValue': 0, + 'longValue': 0, + 'name': 'PORTS', + 'stringSetValue': ['22', '2376', '2375', '51678', '51679'], + 'type': 'STRINGSET'}, + {'doubleValue': 0.0, + 'integerValue': 0, + 'longValue': 0, + 'name': 'PORTS_UDP', + 'stringSetValue': [], + 'type': 'STRINGSET'} + ] self.runningTaskCount = 0 self.versionInfo = { 'agentVersion': "1.0.0", @@ -380,20 +426,72 @@ class EC2ContainerServiceBackend(BaseBackend): container_instances = list( self.container_instances.get(cluster_name, {}).keys()) if not container_instances: - raise Exception( - "No instances found in cluster {}".format(cluster_name)) + raise Exception("No instances found in cluster {}".format(cluster_name)) active_container_instances = [x for x in container_instances if self.container_instances[cluster_name][x].status == 'ACTIVE'] - for _ in range(count or 1): - container_instance_arn = self.container_instances[cluster_name][ - active_container_instances[randint(0, len(active_container_instances) - 1)] - ].containerInstanceArn - task = Task(cluster, task_definition, container_instance_arn, - overrides or {}, started_by or '') - tasks.append(task) - self.tasks[cluster_name][task.task_arn] = task + resource_requirements = self._calculate_task_resource_requirements(task_definition) + # TODO: return event about unable to place task if not able to place enough tasks to meet count + placed_count = 0 + for container_instance in active_container_instances: + container_instance = self.container_instances[cluster_name][container_instance] + container_instance_arn = container_instance.containerInstanceArn + try_to_place = True + while try_to_place: + can_be_placed, message = self._can_be_placed(container_instance, resource_requirements) + if can_be_placed: + task = Task(cluster, task_definition, container_instance_arn, + resource_requirements, overrides or {}, started_by or '') + self.update_container_instance_resources(container_instance, resource_requirements) + tasks.append(task) + self.tasks[cluster_name][task.task_arn] = task + placed_count += 1 + if placed_count == count: + return tasks + else: + try_to_place = False return tasks + @staticmethod + def _calculate_task_resource_requirements(task_definition): + resource_requirements = {"CPU": 0, "MEMORY": 0, "PORTS": [], "PORTS_UDP": []} + for container_definition in task_definition.container_definitions: + resource_requirements["CPU"] += container_definition.get('cpu') + resource_requirements["MEMORY"] += container_definition.get("memory") + for port_mapping in container_definition.get("portMappings", []): + resource_requirements["PORTS"].append(port_mapping.get('hostPort')) + return resource_requirements + + @staticmethod + def _can_be_placed(container_instance, task_resource_requirements): + """ + + :param container_instance: The container instance trying to be placed onto + :param task_resource_requirements: The calculated resource requirements of the task in the form of a dict + :return: A boolean stating whether the given container instance has enough resources to have the task placed on + it as well as a description, if it cannot be placed this will describe why. + """ + # TODO: Implement default and other placement strategies as well as constraints: + # docs.aws.amazon.com/AmazonECS/latest/developerguide/task-placement.html + remaining_cpu = 0 + remaining_memory = 0 + reserved_ports = [] + for resource in container_instance.remainingResources: + if resource.get("name") == "CPU": + remaining_cpu = resource.get("integerValue") + elif resource.get("name") == "MEMORY": + remaining_memory = resource.get("integerValue") + elif resource.get("name") == "PORTS": + reserved_ports = resource.get("stringSetValue") + if task_resource_requirements.get("CPU") > remaining_cpu: + return False, "Not enough CPU credits" + if task_resource_requirements.get("MEMORY") > remaining_memory: + return False, "Not enough memory" + ports_needed = task_resource_requirements.get("PORTS") + for port in ports_needed: + if str(port) in reserved_ports: + return False, "Port clash" + return True, "Can be placed" + def start_task(self, cluster_str, task_definition_str, container_instances, overrides, started_by): cluster_name = cluster_str.split('/')[-1] if cluster_name in self.clusters: @@ -409,14 +507,15 @@ class EC2ContainerServiceBackend(BaseBackend): container_instance_ids = [x.split('/')[-1] for x in container_instances] - + resource_requirements = self._calculate_task_resource_requirements(task_definition) for container_instance_id in container_instance_ids: - container_instance_arn = self.container_instances[cluster_name][ + container_instance = self.container_instances[cluster_name][ container_instance_id - ].containerInstanceArn - task = Task(cluster, task_definition, container_instance_arn, - overrides or {}, started_by or '') + ] + task = Task(cluster, task_definition, container_instance.containerInstanceArn, + resource_requirements, overrides or {}, started_by or '') tasks.append(task) + self.update_container_instance_resources(container_instance, resource_requirements) self.tasks[cluster_name][task.task_arn] = task return tasks @@ -470,6 +569,10 @@ class EC2ContainerServiceBackend(BaseBackend): "Cluster {} has no registered tasks".format(cluster_name)) for task in tasks.keys(): if task.endswith(task_id): + container_instance_arn = tasks[task].container_instance_arn + container_instance = self.container_instances[cluster_name][container_instance_arn.split('/')[-1]] + self.update_container_instance_resources(container_instance, tasks[task].resource_requirements, + removing=True) tasks[task].last_status = 'STOPPED' tasks[task].desired_status = 'STOPPED' tasks[task].stopped_reason = reason @@ -566,6 +669,7 @@ class EC2ContainerServiceBackend(BaseBackend): failures = [] container_instance_objects = [] for container_instance_id in list_container_instance_ids: + container_instance_id = container_instance_id.split('/')[-1] container_instance = self.container_instances[ cluster_name].get(container_instance_id, None) if container_instance is not None: @@ -582,7 +686,8 @@ class EC2ContainerServiceBackend(BaseBackend): raise Exception("{0} is not a cluster".format(cluster_name)) status = status.upper() if status not in ['ACTIVE', 'DRAINING']: - raise Exception("An error occurred (InvalidParameterException) when calling the UpdateContainerInstancesState operation: Container instances status should be one of [ACTIVE,DRAINING]") + raise Exception("An error occurred (InvalidParameterException) when calling the UpdateContainerInstancesState operation: \ + Container instances status should be one of [ACTIVE,DRAINING]") failures = [] container_instance_objects = [] for container_instance_id in list_container_instance_ids: @@ -595,6 +700,22 @@ class EC2ContainerServiceBackend(BaseBackend): return container_instance_objects, failures + def update_container_instance_resources(self, container_instance, task_resources, removing=False): + resource_multiplier = 1 + if removing: + resource_multiplier = -1 + for resource in container_instance.remainingResources: + if resource.get("name") == "CPU": + resource["integerValue"] -= task_resources.get('CPU') * resource_multiplier + elif resource.get("name") == "MEMORY": + resource["integerValue"] -= task_resources.get('MEMORY') * resource_multiplier + elif resource.get("name") == "PORTS": + for port in task_resources.get("PORTS"): + if removing: + resource["stringSetValue"].remove(str(port)) + else: + resource["stringSetValue"].append(str(port)) + def deregister_container_instance(self, cluster_str, container_instance_str): pass diff --git a/moto/ecs/responses.py b/moto/ecs/responses.py index 338cfec28..9c4524816 100644 --- a/moto/ecs/responses.py +++ b/moto/ecs/responses.py @@ -225,7 +225,9 @@ class EC2ContainerServiceResponse(BaseResponse): cluster_str = self._get_param('cluster') list_container_instance_arns = self._get_param('containerInstances') status_str = self._get_param('status') - container_instances, failures = self.ecs_backend.update_container_instances_state(cluster_str, list_container_instance_arns, status_str) + container_instances, failures = self.ecs_backend.update_container_instances_state(cluster_str, + list_container_instance_arns, + status_str) return json.dumps({ 'failures': [ci.response_object for ci in failures], 'containerInstances': [ci.response_object for ci in container_instances] diff --git a/tests/test_ecs/test_ecs_boto3.py b/tests/test_ecs/test_ecs_boto3.py index 82b6be195..e76be8cbe 100644 --- a/tests/test_ecs/test_ecs_boto3.py +++ b/tests/test_ecs/test_ecs_boto3.py @@ -531,8 +531,8 @@ def test_register_container_instance(): 'arn:aws:ecs:us-east-1:012345678910:container-instance') arn_part[1].should.equal(str(UUID(arn_part[1]))) response['containerInstance']['status'].should.equal('ACTIVE') - len(response['containerInstance']['registeredResources']).should.equal(0) - len(response['containerInstance']['remainingResources']).should.equal(0) + len(response['containerInstance']['registeredResources']).should.equal(4) + len(response['containerInstance']['remainingResources']).should.equal(4) response['containerInstance']['agentConnected'].should.equal(True) response['containerInstance']['versionInfo'][ 'agentVersion'].should.equal('1.0.0') @@ -622,6 +622,7 @@ def test_describe_container_instances(): for arn in test_instance_arns: response_arns.should.contain(arn) + @mock_ec2 @mock_ecs def test_update_container_instances_state(): @@ -653,26 +654,33 @@ def test_update_container_instances_state(): test_instance_arns.append(response['containerInstance']['containerInstanceArn']) test_instance_ids = list(map((lambda x: x.split('/')[1]), test_instance_arns)) - response = ecs_client.update_container_instances_state(cluster=test_cluster_name, containerInstances=test_instance_ids, status='DRAINING') + response = ecs_client.update_container_instances_state(cluster=test_cluster_name, + containerInstances=test_instance_ids, + status='DRAINING') len(response['failures']).should.equal(0) len(response['containerInstances']).should.equal(instance_to_create) response_statuses = [ci['status'] for ci in response['containerInstances']] for status in response_statuses: status.should.equal('DRAINING') - response = ecs_client.update_container_instances_state(cluster=test_cluster_name, containerInstances=test_instance_ids, status='DRAINING') + response = ecs_client.update_container_instances_state(cluster=test_cluster_name, + containerInstances=test_instance_ids, + status='DRAINING') len(response['failures']).should.equal(0) len(response['containerInstances']).should.equal(instance_to_create) response_statuses = [ci['status'] for ci in response['containerInstances']] for status in response_statuses: status.should.equal('DRAINING') - response = ecs_client.update_container_instances_state(cluster=test_cluster_name, containerInstances=test_instance_ids, status='ACTIVE') + response = ecs_client.update_container_instances_state(cluster=test_cluster_name, + containerInstances=test_instance_ids, + status='ACTIVE') len(response['failures']).should.equal(0) len(response['containerInstances']).should.equal(instance_to_create) response_statuses = [ci['status'] for ci in response['containerInstances']] for status in response_statuses: status.should.equal('ACTIVE') - ecs_client.update_container_instances_state.when.called_with(cluster=test_cluster_name, containerInstances=test_instance_ids, status='test_status').should.throw(Exception) - + ecs_client.update_container_instances_state.when.called_with(cluster=test_cluster_name, + containerInstances=test_instance_ids, + status='test_status').should.throw(Exception) @mock_ec2 @@ -838,7 +846,7 @@ def test_list_tasks(): ec2_utils.generate_instance_identity_document(test_instance) ) - response = client.register_container_instance( + _ = client.register_container_instance( cluster=test_cluster_name, instanceIdentityDocument=instance_id_document ) @@ -1042,6 +1050,88 @@ def test_stop_task(): stop_response['task']['stoppedReason'].should.equal('moto testing') +@mock_ec2 +@mock_ecs +def test_resource_reservation_and_release(): + client = boto3.client('ecs', region_name='us-east-1') + ec2 = boto3.resource('ec2', region_name='us-east-1') + + test_cluster_name = 'test_ecs_cluster' + + _ = client.create_cluster( + clusterName=test_cluster_name + ) + + test_instance = ec2.create_instances( + ImageId="ami-1234abcd", + MinCount=1, + MaxCount=1, + )[0] + + instance_id_document = json.dumps( + ec2_utils.generate_instance_identity_document(test_instance) + ) + + _ = client.register_container_instance( + cluster=test_cluster_name, + instanceIdentityDocument=instance_id_document + ) + + _ = client.register_task_definition( + family='test_ecs_task', + containerDefinitions=[ + { + 'name': 'hello_world', + 'image': 'docker/hello-world:latest', + 'cpu': 1024, + 'memory': 400, + 'essential': True, + 'environment': [{ + 'name': 'AWS_ACCESS_KEY_ID', + 'value': 'SOME_ACCESS_KEY' + }], + 'logConfiguration': {'logDriver': 'json-file'}, + 'portMappings': [ + { + 'hostPort': 80, + 'containerPort': 8080 + } + ] + } + ] + ) + run_response = client.run_task( + cluster='test_ecs_cluster', + overrides={}, + taskDefinition='test_ecs_task', + count=1, + startedBy='moto' + ) + container_instance_arn = run_response['tasks'][0].get('containerInstanceArn') + container_instance_description = client.describe_container_instances( + cluster='test_ecs_cluster', + containerInstances=[container_instance_arn] + )['containerInstances'][0] + remaining_resources, registered_resources = _fetch_container_instance_resources(container_instance_description) + remaining_resources['CPU'].should.equal(registered_resources['CPU'] - 1024) + remaining_resources['MEMORY'].should.equal(registered_resources['MEMORY'] - 400) + registered_resources['PORTS'].append('80') + remaining_resources['PORTS'].should.equal(registered_resources['PORTS']) + client.stop_task( + cluster='test_ecs_cluster', + task=run_response['tasks'][0].get('taskArn'), + reason='moto testing' + ) + container_instance_description = client.describe_container_instances( + cluster='test_ecs_cluster', + containerInstances=[container_instance_arn] + )['containerInstances'][0] + remaining_resources, registered_resources = _fetch_container_instance_resources(container_instance_description) + remaining_resources['CPU'].should.equal(registered_resources['CPU']) + remaining_resources['MEMORY'].should.equal(registered_resources['MEMORY']) + remaining_resources['PORTS'].should.equal(registered_resources['PORTS']) + + @mock_ecs @mock_cloudformation def test_create_cluster_through_cloudformation(): @@ -1141,6 +1231,133 @@ def test_create_task_definition_through_cloudformation(): len(resp['taskDefinitionArns']).should.equal(1) +@mock_ec2 +@mock_ecs +def test_task_definitions_unable_to_be_placed(): + client = boto3.client('ecs', region_name='us-east-1') + ec2 = boto3.resource('ec2', region_name='us-east-1') + + test_cluster_name = 'test_ecs_cluster' + + _ = client.create_cluster( + clusterName=test_cluster_name + ) + + test_instance = ec2.create_instances( + ImageId="ami-1234abcd", + MinCount=1, + MaxCount=1, + )[0] + + instance_id_document = json.dumps( + ec2_utils.generate_instance_identity_document(test_instance) + ) + + response = client.register_container_instance( + cluster=test_cluster_name, + instanceIdentityDocument=instance_id_document + ) + + _ = client.register_task_definition( + family='test_ecs_task', + containerDefinitions=[ + { + 'name': 'hello_world', + 'image': 'docker/hello-world:latest', + 'cpu': 5000, + 'memory': 40000, + 'essential': True, + 'environment': [{ + 'name': 'AWS_ACCESS_KEY_ID', + 'value': 'SOME_ACCESS_KEY' + }], + 'logConfiguration': {'logDriver': 'json-file'} + } + ] + ) + response = client.run_task( + cluster='test_ecs_cluster', + overrides={}, + taskDefinition='test_ecs_task', + count=2, + startedBy='moto' + ) + len(response['tasks']).should.equal(0) + + +@mock_ec2 +@mock_ecs +def test_task_definitions_with_port_clash(): + client = boto3.client('ecs', region_name='us-east-1') + ec2 = boto3.resource('ec2', region_name='us-east-1') + + test_cluster_name = 'test_ecs_cluster' + + _ = client.create_cluster( + clusterName=test_cluster_name + ) + + test_instance = ec2.create_instances( + ImageId="ami-1234abcd", + MinCount=1, + MaxCount=1, + )[0] + + instance_id_document = json.dumps( + ec2_utils.generate_instance_identity_document(test_instance) + ) + + response = client.register_container_instance( + cluster=test_cluster_name, + instanceIdentityDocument=instance_id_document + ) + + _ = client.register_task_definition( + family='test_ecs_task', + containerDefinitions=[ + { + 'name': 'hello_world', + 'image': 'docker/hello-world:latest', + 'cpu': 256, + 'memory': 512, + 'essential': True, + 'environment': [{ + 'name': 'AWS_ACCESS_KEY_ID', + 'value': 'SOME_ACCESS_KEY' + }], + 'logConfiguration': {'logDriver': 'json-file'}, + 'portMappings': [ + { + 'hostPort': 80, + 'containerPort': 8080 + } + ] + } + ] + ) + response = client.run_task( + cluster='test_ecs_cluster', + overrides={}, + taskDefinition='test_ecs_task', + count=2, + startedBy='moto' + ) + len(response['tasks']).should.equal(1) + response['tasks'][0]['taskArn'].should.contain( + 'arn:aws:ecs:us-east-1:012345678910:task/') + response['tasks'][0]['clusterArn'].should.equal( + 'arn:aws:ecs:us-east-1:012345678910:cluster/test_ecs_cluster') + response['tasks'][0]['taskDefinitionArn'].should.equal( + 'arn:aws:ecs:us-east-1:012345678910:task-definition/test_ecs_task:1') + response['tasks'][0]['containerInstanceArn'].should.contain( + 'arn:aws:ecs:us-east-1:012345678910:container-instance/') + response['tasks'][0]['overrides'].should.equal({}) + response['tasks'][0]['lastStatus'].should.equal("RUNNING") + response['tasks'][0]['desiredStatus'].should.equal("RUNNING") + response['tasks'][0]['startedBy'].should.equal("moto") + response['tasks'][0]['stoppedReason'].should.equal("") + + @mock_ecs @mock_cloudformation def test_update_task_definition_family_through_cloudformation_should_trigger_a_replacement(): @@ -1294,3 +1511,17 @@ def test_update_service_through_cloudformation_should_trigger_replacement(): ecs_conn = boto3.client('ecs', region_name='us-west-1') resp = ecs_conn.list_services(cluster='testcluster') len(resp['serviceArns']).should.equal(1) + + +def _fetch_container_instance_resources(container_instance_description): + remaining_resources = {} + registered_resources = {} + remaining_resources_list = container_instance_description['remainingResources'] + registered_resources_list = container_instance_description['registeredResources'] + remaining_resources['CPU'] = [x['integerValue'] for x in remaining_resources_list if x['name'] == 'CPU'][0] + remaining_resources['MEMORY'] = [x['integerValue'] for x in remaining_resources_list if x['name'] == 'MEMORY'][0] + remaining_resources['PORTS'] = [x['stringSetValue'] for x in remaining_resources_list if x['name'] == 'PORTS'][0] + registered_resources['CPU'] = [x['integerValue'] for x in registered_resources_list if x['name'] == 'CPU'][0] + registered_resources['MEMORY'] = [x['integerValue'] for x in registered_resources_list if x['name'] == 'MEMORY'][0] + registered_resources['PORTS'] = [x['stringSetValue'] for x in registered_resources_list if x['name'] == 'PORTS'][0] + return remaining_resources, registered_resources