List instances Implementation in EMR (#3871)

* Implemented list instances in EMR

* removed import from tests

* make format

* fix W291 trailing whitespace

* removed to work for py2.7

* Storing only ec2_id and instance group in Fake instance

Co-authored-by: J <jdeepe@147dda1b0833.ant.amazon.com>
This commit is contained in:
Deepesh J 2021-04-23 12:33:06 +05:30 committed by GitHub
parent d8be72e483
commit c31dffcc92
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 160 additions and 3 deletions

View File

@ -17,6 +17,8 @@ from .utils import (
EmrSecurityGroupManager,
)
EXAMPLE_AMI_ID = "ami-12c6146b"
class FakeApplication(BaseModel):
def __init__(self, name, version, args=None, additional_info=None):
@ -33,6 +35,16 @@ class FakeBootstrapAction(BaseModel):
self.script_path = script_path
class FakeInstance(BaseModel):
def __init__(
self, ec2_instance_id, instance_group, instance_fleet_id=None, id=None,
):
self.id = id or random_instance_group_id()
self.ec2_instance_id = ec2_instance_id
self.instance_group = instance_group
self.instance_fleet_id = instance_fleet_id
class FakeInstanceGroup(BaseModel):
def __init__(
self,
@ -177,6 +189,7 @@ class FakeCluster(BaseModel):
self.set_visibility(visible_to_all_users)
self.instance_group_ids = []
self.instances = []
self.master_instance_group_id = None
self.core_instance_group_id = None
if (
@ -319,6 +332,9 @@ class FakeCluster(BaseModel):
self.core_instance_group_id = instance_group.id
self.instance_group_ids.append(instance_group.id)
def add_instance(self, instance):
self.instances.append(instance)
def add_steps(self, steps):
added_steps = []
for step in steps:
@ -390,6 +406,17 @@ class ElasticMapReduceBackend(BaseBackend):
result_groups.append(group)
return result_groups
def add_instances(self, cluster_id, instances, instance_group):
cluster = self.clusters[cluster_id]
response = self.ec2_backend.add_instances(
EXAMPLE_AMI_ID, instances["instance_count"], "", [], **instances
)
for instance in response.instances:
instance = FakeInstance(
ec2_instance_id=instance.id, instance_group=instance_group,
)
cluster.add_instance(instance)
def add_job_flow_steps(self, job_flow_id, steps):
cluster = self.clusters[job_flow_id]
steps = cluster.add_steps(steps)
@ -485,6 +512,25 @@ class ElasticMapReduceBackend(BaseBackend):
)
return groups[start_idx : start_idx + max_items], marker
def list_instances(
self, cluster_id, marker=None, instance_group_id=None, instance_group_types=None
):
max_items = 50
groups = sorted(self.clusters[cluster_id].instances, key=lambda x: x.id)
start_idx = 0 if marker is None else int(marker)
marker = (
None if len(groups) <= start_idx + max_items else str(start_idx + max_items)
)
if instance_group_id:
groups = [g for g in groups if g.instance_group.id == instance_group_id]
if instance_group_types:
groups = [
g for g in groups if g.instance_group.role in instance_group_types
]
for g in groups:
g.details = self.ec2_backend.get_instance(g.ec2_instance_id)
return groups[start_idx : start_idx + max_items], marker
def list_steps(self, cluster_id, marker=None, step_ids=None, step_states=None):
max_items = 50
steps = self.clusters[cluster_id].steps

View File

@ -185,8 +185,20 @@ class ElasticMapReduceResponse(BaseResponse):
template = self.response_template(LIST_INSTANCE_GROUPS_TEMPLATE)
return template.render(instance_groups=instance_groups, marker=marker)
@generate_boto3_response("ListInstances")
def list_instances(self):
raise NotImplementedError
cluster_id = self._get_param("ClusterId")
marker = self._get_param("Marker")
instance_group_id = self._get_param("InstanceGroupId")
instance_group_types = self._get_param("InstanceGroupTypes")
instances, marker = self.backend.list_instances(
cluster_id,
marker=marker,
instance_group_id=instance_group_id,
instance_group_types=instance_group_types,
)
template = self.response_template(LIST_INSTANCES_TEMPLATE)
return template.render(instances=instances, marker=marker)
@generate_boto3_response("ListSteps")
def list_steps(self):
@ -395,7 +407,13 @@ class ElasticMapReduceResponse(BaseResponse):
self._parse_ebs_configuration(ig)
# Adding support for auto_scaling_policy
Unflattener.unflatten_complex_params(ig, "auto_scaling_policy")
self.backend.add_instance_groups(cluster.id, instance_groups)
instance_group_result = self.backend.add_instance_groups(
cluster.id, instance_groups
)
for i in range(0, len(instance_group_result)):
self.backend.add_instances(
cluster.id, instance_groups[i], instance_group_result[i]
)
tags = self._get_list_prefix("Tags.member")
if tags:
@ -1100,6 +1118,55 @@ LIST_INSTANCE_GROUPS_TEMPLATE = """<ListInstanceGroupsResponse xmlns="http://ela
</ResponseMetadata>
</ListInstanceGroupsResponse>"""
LIST_INSTANCES_TEMPLATE = """<ListInstancesResponse xmlns="http://elasticmapreduce.amazonaws.com/doc/2009-03-31">
<ListInstancesResult>
<Instances>
{% for instance in instances %}
<member>
<Id>{{ instance.id }}</Id>
<Ec2InstanceId>{{ instance.ec2_instance_id }}</Ec2InstanceId>
<PublicDnsName>{{ instance.details.public_dns }}</PublicDnsName>
<PublicIpAddress>{{ instance.details.public_ip }}</PublicIpAddress>
<PrivateDnsName>{{ instance.details.private_dns }}</PrivateDnsName>
<PrivateIpAddress>{{ instance.details.private_ip }}</PrivateIpAddress>
<InstanceGroupId>{{ instance.instance_group.id }}</InstanceGroupId>
<InstanceFleetId>{{ instance.instance_fleet_id }}</InstanceFleetId>
<Market>{{ instance.instance_group.market }}</Market>
<InstanceType>{{ instance.details.instance_type }}</InstanceType>
<EbsVolumes>
{% for volume in instance.details.block_device_mapping %}
<member>
<Device>{{ volume }}</Device>
<VolumeId>{{ instance.details.block_device_mapping[volume].volume_id }}</VolumeId>
</member>
{% endfor %}
</EbsVolumes>
<Status>
<State>{{ instance.instance_group.state }}</State>
<StateChangeReason>
{% if instance.state_change_reason is not none %}
<Message>{{ instance.state_change_reason }}</Message>
{% endif %}
</StateChangeReason>
<Timeline>
<CreationDateTime>{{ instance.instance_group.creation_datetime.isoformat() }}</CreationDateTime>
{% if instance.instance_group.end_datetime is not none %}
<EndDateTime>{{ instance.instance_group.end_datetime.isoformat() }}</EndDateTime>
{% endif %}
{% if instance.instance_group.ready_datetime is not none %}
<ReadyDateTime>{{ instance.instance_group.ready_datetime.isoformat() }}</ReadyDateTime>
{% endif %}
</Timeline>
</Status>
</member>
{% endfor %}
</Instances>
</ListInstancesResult>
<ResponseMetadata>
<RequestId>4248c46c-71c0-4772-b155-0e992dc30027</RequestId>
</ResponseMetadata>
</ListInstancesResponse>"""
LIST_STEPS_TEMPLATE = """<ListStepsResponse xmlns="http://elasticmapreduce.amazonaws.com/doc/2009-03-31">
<ListStepsResult>
<Steps>

View File

@ -758,6 +758,51 @@ def test_bootstrap_actions():
x["ScriptPath"].should.equal(y["ScriptBootstrapAction"]["Path"])
@mock_emr
def test_instances():
input_groups = dict((g["Name"], g) for g in input_instance_groups)
client = boto3.client("emr", region_name="us-east-1")
args = deepcopy(run_job_flow_args)
args["Instances"] = {"InstanceGroups": input_instance_groups}
cluster_id = client.run_job_flow(**args)["JobFlowId"]
jf = client.describe_job_flows(JobFlowIds=[cluster_id])["JobFlows"][0]
instances = client.list_instances(ClusterId=cluster_id)["Instances"]
len(instances).should.equal(sum(g["InstanceCount"] for g in input_instance_groups))
for x in instances:
x.should.have.key("InstanceGroupId")
instance_group = [
j
for j in jf["Instances"]["InstanceGroups"]
if j["InstanceGroupId"] == x["InstanceGroupId"]
]
len(instance_group).should.equal(1)
y = input_groups[instance_group[0]["Name"]]
x.should.have.key("Id")
x.should.have.key("Ec2InstanceId")
x.should.have.key("PublicDnsName")
x.should.have.key("PublicIpAddress")
x.should.have.key("PrivateDnsName")
x.should.have.key("PrivateIpAddress")
x.should.have.key("InstanceFleetId")
x["InstanceType"].should.equal(y["InstanceType"])
x["Market"].should.equal(y["Market"])
x["Status"]["Timeline"]["ReadyDateTime"].should.be.a("datetime.datetime")
x["Status"]["Timeline"]["CreationDateTime"].should.be.a("datetime.datetime")
x["Status"]["State"].should.equal("RUNNING")
for x in [["MASTER"], ["CORE"], ["TASK"], ["MASTER", "TASK"]]:
instances = client.list_instances(ClusterId=cluster_id, InstanceGroupTypes=x)[
"Instances"
]
len(instances).should.equal(
sum(
g["InstanceCount"]
for g in input_instance_groups
if g["InstanceRole"] in x
)
)
@mock_emr
def test_instance_groups():
input_groups = dict((g["Name"], g) for g in input_instance_groups)
@ -800,7 +845,6 @@ def test_instance_groups():
x["ReadyDateTime"].should.be.a("datetime.datetime")
x["StartDateTime"].should.be.a("datetime.datetime")
x["State"].should.equal("RUNNING")
groups = client.list_instance_groups(ClusterId=cluster_id)["InstanceGroups"]
for x in groups:
y = deepcopy(input_groups[x["Name"]])