diff --git a/moto/emr/models.py b/moto/emr/models.py index 74050fed7..1f2459d6c 100644 --- a/moto/emr/models.py +++ b/moto/emr/models.py @@ -17,6 +17,8 @@ from .utils import ( EmrSecurityGroupManager, ) +EXAMPLE_AMI_ID = "ami-12c6146b" + class FakeApplication(BaseModel): def __init__(self, name, version, args=None, additional_info=None): @@ -33,6 +35,16 @@ class FakeBootstrapAction(BaseModel): self.script_path = script_path +class FakeInstance(BaseModel): + def __init__( + self, ec2_instance_id, instance_group, instance_fleet_id=None, id=None, + ): + self.id = id or random_instance_group_id() + self.ec2_instance_id = ec2_instance_id + self.instance_group = instance_group + self.instance_fleet_id = instance_fleet_id + + class FakeInstanceGroup(BaseModel): def __init__( self, @@ -177,6 +189,7 @@ class FakeCluster(BaseModel): self.set_visibility(visible_to_all_users) self.instance_group_ids = [] + self.instances = [] self.master_instance_group_id = None self.core_instance_group_id = None if ( @@ -319,6 +332,9 @@ class FakeCluster(BaseModel): self.core_instance_group_id = instance_group.id self.instance_group_ids.append(instance_group.id) + def add_instance(self, instance): + self.instances.append(instance) + def add_steps(self, steps): added_steps = [] for step in steps: @@ -390,6 +406,17 @@ class ElasticMapReduceBackend(BaseBackend): result_groups.append(group) return result_groups + def add_instances(self, cluster_id, instances, instance_group): + cluster = self.clusters[cluster_id] + response = self.ec2_backend.add_instances( + EXAMPLE_AMI_ID, instances["instance_count"], "", [], **instances + ) + for instance in response.instances: + instance = FakeInstance( + ec2_instance_id=instance.id, instance_group=instance_group, + ) + cluster.add_instance(instance) + def add_job_flow_steps(self, job_flow_id, steps): cluster = self.clusters[job_flow_id] steps = cluster.add_steps(steps) @@ -485,6 +512,25 @@ class ElasticMapReduceBackend(BaseBackend): ) return groups[start_idx : start_idx + max_items], marker + def list_instances( + self, cluster_id, marker=None, instance_group_id=None, instance_group_types=None + ): + max_items = 50 + groups = sorted(self.clusters[cluster_id].instances, key=lambda x: x.id) + start_idx = 0 if marker is None else int(marker) + marker = ( + None if len(groups) <= start_idx + max_items else str(start_idx + max_items) + ) + if instance_group_id: + groups = [g for g in groups if g.instance_group.id == instance_group_id] + if instance_group_types: + groups = [ + g for g in groups if g.instance_group.role in instance_group_types + ] + for g in groups: + g.details = self.ec2_backend.get_instance(g.ec2_instance_id) + return groups[start_idx : start_idx + max_items], marker + def list_steps(self, cluster_id, marker=None, step_ids=None, step_states=None): max_items = 50 steps = self.clusters[cluster_id].steps diff --git a/moto/emr/responses.py b/moto/emr/responses.py index a5d98ced4..3eaa3b49c 100644 --- a/moto/emr/responses.py +++ b/moto/emr/responses.py @@ -185,8 +185,20 @@ class ElasticMapReduceResponse(BaseResponse): template = self.response_template(LIST_INSTANCE_GROUPS_TEMPLATE) return template.render(instance_groups=instance_groups, marker=marker) + @generate_boto3_response("ListInstances") def list_instances(self): - raise NotImplementedError + cluster_id = self._get_param("ClusterId") + marker = self._get_param("Marker") + instance_group_id = self._get_param("InstanceGroupId") + instance_group_types = self._get_param("InstanceGroupTypes") + instances, marker = self.backend.list_instances( + cluster_id, + marker=marker, + instance_group_id=instance_group_id, + instance_group_types=instance_group_types, + ) + template = self.response_template(LIST_INSTANCES_TEMPLATE) + return template.render(instances=instances, marker=marker) @generate_boto3_response("ListSteps") def list_steps(self): @@ -395,7 +407,13 @@ class ElasticMapReduceResponse(BaseResponse): self._parse_ebs_configuration(ig) # Adding support for auto_scaling_policy Unflattener.unflatten_complex_params(ig, "auto_scaling_policy") - self.backend.add_instance_groups(cluster.id, instance_groups) + instance_group_result = self.backend.add_instance_groups( + cluster.id, instance_groups + ) + for i in range(0, len(instance_group_result)): + self.backend.add_instances( + cluster.id, instance_groups[i], instance_group_result[i] + ) tags = self._get_list_prefix("Tags.member") if tags: @@ -1100,6 +1118,55 @@ LIST_INSTANCE_GROUPS_TEMPLATE = """ + + + {% for instance in instances %} + + {{ instance.id }} + {{ instance.ec2_instance_id }} + {{ instance.details.public_dns }} + {{ instance.details.public_ip }} + {{ instance.details.private_dns }} + {{ instance.details.private_ip }} + {{ instance.instance_group.id }} + {{ instance.instance_fleet_id }} + {{ instance.instance_group.market }} + {{ instance.details.instance_type }} + + {% for volume in instance.details.block_device_mapping %} + + {{ volume }} + {{ instance.details.block_device_mapping[volume].volume_id }} + + {% endfor %} + + + {{ instance.instance_group.state }} + + {% if instance.state_change_reason is not none %} + {{ instance.state_change_reason }} + {% endif %} + + + {{ instance.instance_group.creation_datetime.isoformat() }} + {% if instance.instance_group.end_datetime is not none %} + {{ instance.instance_group.end_datetime.isoformat() }} + {% endif %} + {% if instance.instance_group.ready_datetime is not none %} + {{ instance.instance_group.ready_datetime.isoformat() }} + {% endif %} + + + + {% endfor %} + + + + 4248c46c-71c0-4772-b155-0e992dc30027 + +""" + LIST_STEPS_TEMPLATE = """ diff --git a/tests/test_emr/test_emr_boto3.py b/tests/test_emr/test_emr_boto3.py index e2aa49444..ad9029847 100644 --- a/tests/test_emr/test_emr_boto3.py +++ b/tests/test_emr/test_emr_boto3.py @@ -758,6 +758,51 @@ def test_bootstrap_actions(): x["ScriptPath"].should.equal(y["ScriptBootstrapAction"]["Path"]) +@mock_emr +def test_instances(): + input_groups = dict((g["Name"], g) for g in input_instance_groups) + client = boto3.client("emr", region_name="us-east-1") + args = deepcopy(run_job_flow_args) + args["Instances"] = {"InstanceGroups": input_instance_groups} + cluster_id = client.run_job_flow(**args)["JobFlowId"] + jf = client.describe_job_flows(JobFlowIds=[cluster_id])["JobFlows"][0] + instances = client.list_instances(ClusterId=cluster_id)["Instances"] + len(instances).should.equal(sum(g["InstanceCount"] for g in input_instance_groups)) + for x in instances: + x.should.have.key("InstanceGroupId") + instance_group = [ + j + for j in jf["Instances"]["InstanceGroups"] + if j["InstanceGroupId"] == x["InstanceGroupId"] + ] + len(instance_group).should.equal(1) + y = input_groups[instance_group[0]["Name"]] + x.should.have.key("Id") + x.should.have.key("Ec2InstanceId") + x.should.have.key("PublicDnsName") + x.should.have.key("PublicIpAddress") + x.should.have.key("PrivateDnsName") + x.should.have.key("PrivateIpAddress") + x.should.have.key("InstanceFleetId") + x["InstanceType"].should.equal(y["InstanceType"]) + x["Market"].should.equal(y["Market"]) + x["Status"]["Timeline"]["ReadyDateTime"].should.be.a("datetime.datetime") + x["Status"]["Timeline"]["CreationDateTime"].should.be.a("datetime.datetime") + x["Status"]["State"].should.equal("RUNNING") + + for x in [["MASTER"], ["CORE"], ["TASK"], ["MASTER", "TASK"]]: + instances = client.list_instances(ClusterId=cluster_id, InstanceGroupTypes=x)[ + "Instances" + ] + len(instances).should.equal( + sum( + g["InstanceCount"] + for g in input_instance_groups + if g["InstanceRole"] in x + ) + ) + + @mock_emr def test_instance_groups(): input_groups = dict((g["Name"], g) for g in input_instance_groups) @@ -800,7 +845,6 @@ def test_instance_groups(): x["ReadyDateTime"].should.be.a("datetime.datetime") x["StartDateTime"].should.be.a("datetime.datetime") x["State"].should.equal("RUNNING") - groups = client.list_instance_groups(ClusterId=cluster_id)["InstanceGroups"] for x in groups: y = deepcopy(input_groups[x["Name"]])