#2784 Adding missing support for EbsConfiguration on EMR instance groups

This commit is contained in:
addomafi 2020-03-06 15:12:44 -03:00
parent f7acdb9b3a
commit c8dfbe9575
2 changed files with 109 additions and 12 deletions

View File

@ -73,6 +73,8 @@ class ElasticMapReduceResponse(BaseResponse):
instance_groups = self._get_list_prefix("InstanceGroups.member")
for item in instance_groups:
item["instance_count"] = int(item["instance_count"])
# Adding support to EbsConfiguration
self._parse_ebs_configuration(item)
instance_groups = self.backend.add_instance_groups(jobflow_id, instance_groups)
template = self.response_template(ADD_INSTANCE_GROUPS_TEMPLATE)
return template.render(instance_groups=instance_groups)
@ -324,6 +326,8 @@ class ElasticMapReduceResponse(BaseResponse):
if instance_groups:
for ig in instance_groups:
ig["instance_count"] = int(ig["instance_count"])
# Adding support to EbsConfiguration
self._parse_ebs_configuration(ig)
self.backend.add_instance_groups(cluster.id, instance_groups)
tags = self._get_list_prefix("Tags.member")
@ -335,6 +339,83 @@ class ElasticMapReduceResponse(BaseResponse):
template = self.response_template(RUN_JOB_FLOW_TEMPLATE)
return template.render(cluster=cluster)
def _has_key_prefix(self, key_prefix, value):
for key in value: # iter on both keys and values
if key.startswith(key_prefix):
return True
return False
def _parse_ebs_configuration(self, instance_group):
key_ebs_config = "ebs_configuration"
ebs_configuration = dict()
# Filter only EBS config keys
for key in instance_group:
if key.startswith(key_ebs_config):
ebs_configuration[key] = instance_group[key]
if len(ebs_configuration) > 0:
# Key that should be extracted
ebs_optimized = "ebs_optimized"
ebs_block_device_configs = "ebs_block_device_configs"
volume_specification = "volume_specification"
size_in_gb = "size_in_gb"
volume_type = "volume_type"
iops = "iops"
volumes_per_instance = "volumes_per_instance"
key_ebs_optimized = f"{key_ebs_config}._{ebs_optimized}"
# EbsOptimized config
if key_ebs_optimized in ebs_configuration:
instance_group.pop(key_ebs_optimized)
ebs_configuration[ebs_optimized] = ebs_configuration.pop(
key_ebs_optimized
)
# Ebs Blocks
ebs_blocks = []
idx = 1
keyfmt = f"{key_ebs_config}._{ebs_block_device_configs}.member.{{}}"
key = keyfmt.format(idx)
while self._has_key_prefix(key, ebs_configuration):
vlespc_keyfmt = f"{key}._{volume_specification}._{{}}"
vol_size = vlespc_keyfmt.format(size_in_gb)
vol_iops = vlespc_keyfmt.format(iops)
vol_type = vlespc_keyfmt.format(volume_type)
ebs_block = dict()
ebs_block[volume_specification] = dict()
if vol_size in ebs_configuration:
instance_group.pop(vol_size)
ebs_block[volume_specification][size_in_gb] = int(
ebs_configuration.pop(vol_size)
)
if vol_iops in ebs_configuration:
instance_group.pop(vol_iops)
ebs_block[volume_specification][iops] = ebs_configuration.pop(
vol_iops
)
if vol_type in ebs_configuration:
instance_group.pop(vol_type)
ebs_block[volume_specification][
volume_type
] = ebs_configuration.pop(vol_type)
per_instance = f"{key}._{volumes_per_instance}"
if per_instance in ebs_configuration:
instance_group.pop(per_instance)
ebs_block[volumes_per_instance] = int(
ebs_configuration.pop(per_instance)
)
if len(ebs_block) > 0:
ebs_blocks.append(ebs_block)
idx += 1
key = keyfmt.format(idx)
if len(ebs_blocks) > 0:
ebs_configuration[ebs_block_device_configs] = ebs_blocks
instance_group[key_ebs_config] = ebs_configuration
@generate_boto3_response("SetTerminationProtection")
def set_termination_protection(self):
termination_protection = self._get_param("TerminationProtected")
@ -754,7 +835,22 @@ LIST_INSTANCE_GROUPS_TEMPLATE = """<ListInstanceGroupsResponse xmlns="http://ela
<BidPrice>{{ instance_group.bid_price }}</BidPrice>
{% endif %}
<Configurations/>
<EbsBlockDevices/>
{% if instance_group.ebs_configuration is not none %}
<EbsBlockDevices>
{% for ebs_block_device in instance_group.ebs_configuration.ebs_block_device_configs %}
{% for i in range(ebs_block_device.volumes_per_instance) %}
<member>
<VolumeSpecification>
<VolumeType>{{ebs_block_device.volume_specification.volume_type}}</VolumeType>
<Iops>{{ebs_block_device.volume_specification.iops}}</Iops>
<SizeInGB>{{ebs_block_device.volume_specification.size_in_gb}}</SizeInGB>
</VolumeSpecification>
<Device>/dev/sd{{i}}</Device>
</member>
{% endfor %}
{% endfor %}
</EbsBlockDevices>
{% endif %}
{% if instance_group.ebs_optimized is not none %}
<EbsOptimized>{{ instance_group.ebs_optimized }}</EbsOptimized>
{% endif %}

View File

@ -60,14 +60,6 @@ input_instance_groups = [
"Market": "SPOT",
"Name": "task-2",
"BidPrice": "0.05",
},
{
"InstanceCount": 10,
"InstanceRole": "TASK",
"InstanceType": "c1.xlarge",
"Market": "SPOT",
"Name": "task-3",
"BidPrice": "0.05",
"EbsConfiguration": {
"EbsBlockDeviceConfigs": [
{
@ -623,8 +615,6 @@ def test_instance_groups():
y = input_groups[x["Name"]]
if hasattr(y, "BidPrice"):
x["BidPrice"].should.equal("BidPrice")
if "EbsConfiguration" in y:
x["EbsConfiguration"].should.equal(y["EbsConfiguration"])
x["CreationDateTime"].should.be.a("datetime.datetime")
# x['EndDateTime'].should.be.a('datetime.datetime')
x.should.have.key("InstanceGroupId")
@ -645,7 +635,18 @@ def test_instance_groups():
if hasattr(y, "BidPrice"):
x["BidPrice"].should.equal("BidPrice")
if "EbsConfiguration" in y:
x["EbsConfiguration"].should.equal(y["EbsConfiguration"])
total_volumes = 0
total_size = 0
for ebs_block in y["EbsConfiguration"]["EbsBlockDeviceConfigs"]:
total_volumes += ebs_block["VolumesPerInstance"]
total_size += ebs_block["VolumeSpecification"]["SizeInGB"]
# Multiply by total volumes
total_size = total_size * total_volumes
comp_total_size = 0
for ebs_block in x["EbsBlockDevices"]:
comp_total_size += ebs_block["VolumeSpecification"]["SizeInGB"]
len(x["EbsBlockDevices"]).should.equal(total_volumes)
comp_total_size.should.equal(comp_total_size)
# Configurations
# EbsBlockDevices
# EbsOptimized