Merge pull request #2785 from addomafi/master

Enhancement: Adding support to EbsConfiguration on configuration of Instance Groups for an EMR Endpoint
This commit is contained in:
Steve Pulec 2020-03-07 11:03:27 -06:00 committed by GitHub
commit 51da32825d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 130 additions and 1 deletions

View File

@ -35,6 +35,7 @@ class FakeInstanceGroup(BaseModel):
name=None,
id=None,
bid_price=None,
ebs_configuration=None,
):
self.id = id or random_instance_group_id()
@ -51,6 +52,7 @@ class FakeInstanceGroup(BaseModel):
self.num_instances = instance_count
self.role = instance_role
self.type = instance_type
self.ebs_configuration = ebs_configuration
self.creation_datetime = datetime.now(pytz.utc)
self.start_datetime = datetime.now(pytz.utc)

View File

@ -73,6 +73,8 @@ class ElasticMapReduceResponse(BaseResponse):
instance_groups = self._get_list_prefix("InstanceGroups.member")
for item in instance_groups:
item["instance_count"] = int(item["instance_count"])
# Adding support to EbsConfiguration
self._parse_ebs_configuration(item)
instance_groups = self.backend.add_instance_groups(jobflow_id, instance_groups)
template = self.response_template(ADD_INSTANCE_GROUPS_TEMPLATE)
return template.render(instance_groups=instance_groups)
@ -324,6 +326,8 @@ class ElasticMapReduceResponse(BaseResponse):
if instance_groups:
for ig in instance_groups:
ig["instance_count"] = int(ig["instance_count"])
# Adding support to EbsConfiguration
self._parse_ebs_configuration(ig)
self.backend.add_instance_groups(cluster.id, instance_groups)
tags = self._get_list_prefix("Tags.member")
@ -335,6 +339,85 @@ class ElasticMapReduceResponse(BaseResponse):
template = self.response_template(RUN_JOB_FLOW_TEMPLATE)
return template.render(cluster=cluster)
def _has_key_prefix(self, key_prefix, value):
for key in value: # iter on both keys and values
if key.startswith(key_prefix):
return True
return False
def _parse_ebs_configuration(self, instance_group):
key_ebs_config = "ebs_configuration"
ebs_configuration = dict()
# Filter only EBS config keys
for key in instance_group:
if key.startswith(key_ebs_config):
ebs_configuration[key] = instance_group[key]
if len(ebs_configuration) > 0:
# Key that should be extracted
ebs_optimized = "ebs_optimized"
ebs_block_device_configs = "ebs_block_device_configs"
volume_specification = "volume_specification"
size_in_gb = "size_in_gb"
volume_type = "volume_type"
iops = "iops"
volumes_per_instance = "volumes_per_instance"
key_ebs_optimized = "{0}._{1}".format(key_ebs_config, ebs_optimized)
# EbsOptimized config
if key_ebs_optimized in ebs_configuration:
instance_group.pop(key_ebs_optimized)
ebs_configuration[ebs_optimized] = ebs_configuration.pop(
key_ebs_optimized
)
# Ebs Blocks
ebs_blocks = []
idx = 1
keyfmt = "{0}._{1}.member.{{}}".format(
key_ebs_config, ebs_block_device_configs
)
key = keyfmt.format(idx)
while self._has_key_prefix(key, ebs_configuration):
vlespc_keyfmt = "{0}._{1}._{{}}".format(key, volume_specification)
vol_size = vlespc_keyfmt.format(size_in_gb)
vol_iops = vlespc_keyfmt.format(iops)
vol_type = vlespc_keyfmt.format(volume_type)
ebs_block = dict()
ebs_block[volume_specification] = dict()
if vol_size in ebs_configuration:
instance_group.pop(vol_size)
ebs_block[volume_specification][size_in_gb] = int(
ebs_configuration.pop(vol_size)
)
if vol_iops in ebs_configuration:
instance_group.pop(vol_iops)
ebs_block[volume_specification][iops] = ebs_configuration.pop(
vol_iops
)
if vol_type in ebs_configuration:
instance_group.pop(vol_type)
ebs_block[volume_specification][
volume_type
] = ebs_configuration.pop(vol_type)
per_instance = "{0}._{1}".format(key, volumes_per_instance)
if per_instance in ebs_configuration:
instance_group.pop(per_instance)
ebs_block[volumes_per_instance] = int(
ebs_configuration.pop(per_instance)
)
if len(ebs_block) > 0:
ebs_blocks.append(ebs_block)
idx += 1
key = keyfmt.format(idx)
if len(ebs_blocks) > 0:
ebs_configuration[ebs_block_device_configs] = ebs_blocks
instance_group[key_ebs_config] = ebs_configuration
@generate_boto3_response("SetTerminationProtection")
def set_termination_protection(self):
termination_protection = self._get_param("TerminationProtected")
@ -754,7 +837,22 @@ LIST_INSTANCE_GROUPS_TEMPLATE = """<ListInstanceGroupsResponse xmlns="http://ela
<BidPrice>{{ instance_group.bid_price }}</BidPrice>
{% endif %}
<Configurations/>
<EbsBlockDevices/>
{% if instance_group.ebs_configuration is not none %}
<EbsBlockDevices>
{% for ebs_block_device in instance_group.ebs_configuration.ebs_block_device_configs %}
{% for i in range(ebs_block_device.volumes_per_instance) %}
<member>
<VolumeSpecification>
<VolumeType>{{ebs_block_device.volume_specification.volume_type}}</VolumeType>
<Iops>{{ebs_block_device.volume_specification.iops}}</Iops>
<SizeInGB>{{ebs_block_device.volume_specification.size_in_gb}}</SizeInGB>
</VolumeSpecification>
<Device>/dev/sd{{i}}</Device>
</member>
{% endfor %}
{% endfor %}
</EbsBlockDevices>
{% endif %}
{% if instance_group.ebs_optimized is not none %}
<EbsOptimized>{{ instance_group.ebs_optimized }}</EbsOptimized>
{% endif %}

View File

@ -60,6 +60,15 @@ input_instance_groups = [
"Market": "SPOT",
"Name": "task-2",
"BidPrice": "0.05",
"EbsConfiguration": {
"EbsBlockDeviceConfigs": [
{
"VolumeSpecification": {"VolumeType": "gp2", "SizeInGB": 800},
"VolumesPerInstance": 6,
},
],
"EbsOptimized": True,
},
},
]
@ -430,6 +439,21 @@ def test_run_job_flow_with_visible_to_all_users():
resp["Cluster"]["VisibleToAllUsers"].should.equal(expected)
def _do_assertion_ebs_configuration(x, y):
total_volumes = 0
total_size = 0
for ebs_block in y["EbsConfiguration"]["EbsBlockDeviceConfigs"]:
total_volumes += ebs_block["VolumesPerInstance"]
total_size += ebs_block["VolumeSpecification"]["SizeInGB"]
# Multiply by total volumes
total_size = total_size * total_volumes
comp_total_size = 0
for ebs_block in x["EbsBlockDevices"]:
comp_total_size += ebs_block["VolumeSpecification"]["SizeInGB"]
len(x["EbsBlockDevices"]).should.equal(total_volumes)
comp_total_size.should.equal(comp_total_size)
@mock_emr
def test_run_job_flow_with_instance_groups():
input_groups = dict((g["Name"], g) for g in input_instance_groups)
@ -448,6 +472,9 @@ def test_run_job_flow_with_instance_groups():
if "BidPrice" in y:
x["BidPrice"].should.equal(y["BidPrice"])
if "EbsConfiguration" in y:
_do_assertion_ebs_configuration(x, y)
@mock_emr
def test_run_job_flow_with_custom_ami():
@ -623,6 +650,8 @@ def test_instance_groups():
y = input_groups[x["Name"]]
if hasattr(y, "BidPrice"):
x["BidPrice"].should.equal("BidPrice")
if "EbsConfiguration" in y:
_do_assertion_ebs_configuration(x, y)
# Configurations
# EbsBlockDevices
# EbsOptimized