Support for autoscaling policies in run_jobflow, add_instance_group and list_instance_groups. (#3288)

Support for cluster_id parameter substitution in autoscaling policy cloudwatch alarm dimensions.
New operations put_autoscaling_policy and remove_autoscaling_policy support

Co-authored-by: Joseph Weitekamp <jweite@amazon.com>
This commit is contained in:
jweite 2020-09-23 06:21:45 -04:00 committed by GitHub
parent d54eafa420
commit cd20668e9c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 537 additions and 10 deletions

View File

@ -7,7 +7,12 @@ from boto3 import Session
from dateutil.parser import parse as dtparse from dateutil.parser import parse as dtparse
from moto.core import BaseBackend, BaseModel from moto.core import BaseBackend, BaseModel
from moto.emr.exceptions import EmrError from moto.emr.exceptions import EmrError
from .utils import random_instance_group_id, random_cluster_id, random_step_id from .utils import (
random_instance_group_id,
random_cluster_id,
random_step_id,
CamelToUnderscoresWalker,
)
class FakeApplication(BaseModel): class FakeApplication(BaseModel):
@ -28,6 +33,7 @@ class FakeBootstrapAction(BaseModel):
class FakeInstanceGroup(BaseModel): class FakeInstanceGroup(BaseModel):
def __init__( def __init__(
self, self,
cluster_id,
instance_count, instance_count,
instance_role, instance_role,
instance_type, instance_type,
@ -36,8 +42,10 @@ class FakeInstanceGroup(BaseModel):
id=None, id=None,
bid_price=None, bid_price=None,
ebs_configuration=None, ebs_configuration=None,
auto_scaling_policy=None,
): ):
self.id = id or random_instance_group_id() self.id = id or random_instance_group_id()
self.cluster_id = cluster_id
self.bid_price = bid_price self.bid_price = bid_price
self.market = market self.market = market
@ -53,7 +61,7 @@ class FakeInstanceGroup(BaseModel):
self.role = instance_role self.role = instance_role
self.type = instance_type self.type = instance_type
self.ebs_configuration = ebs_configuration self.ebs_configuration = ebs_configuration
self.auto_scaling_policy = auto_scaling_policy
self.creation_datetime = datetime.now(pytz.utc) self.creation_datetime = datetime.now(pytz.utc)
self.start_datetime = datetime.now(pytz.utc) self.start_datetime = datetime.now(pytz.utc)
self.ready_datetime = datetime.now(pytz.utc) self.ready_datetime = datetime.now(pytz.utc)
@ -63,6 +71,34 @@ class FakeInstanceGroup(BaseModel):
def set_instance_count(self, instance_count): def set_instance_count(self, instance_count):
self.num_instances = instance_count self.num_instances = instance_count
@property
def auto_scaling_policy(self):
return self._auto_scaling_policy
@auto_scaling_policy.setter
def auto_scaling_policy(self, value):
if value is None:
self._auto_scaling_policy = value
return
self._auto_scaling_policy = CamelToUnderscoresWalker.parse(value)
self._auto_scaling_policy["status"] = {"state": "ATTACHED"}
# Transform common ${emr.clusterId} placeholder in any dimensions it occurs in.
if "rules" in self._auto_scaling_policy:
for rule in self._auto_scaling_policy["rules"]:
if (
"trigger" in rule
and "cloud_watch_alarm_definition" in rule["trigger"]
and "dimensions" in rule["trigger"]["cloud_watch_alarm_definition"]
):
for dimension in rule["trigger"]["cloud_watch_alarm_definition"][
"dimensions"
]:
if (
"value" in dimension
and dimension["value"] == "${emr.clusterId}"
):
dimension["value"] = self.cluster_id
class FakeStep(BaseModel): class FakeStep(BaseModel):
def __init__( def __init__(
@ -319,7 +355,7 @@ class ElasticMapReduceBackend(BaseBackend):
cluster = self.clusters[cluster_id] cluster = self.clusters[cluster_id]
result_groups = [] result_groups = []
for instance_group in instance_groups: for instance_group in instance_groups:
group = FakeInstanceGroup(**instance_group) group = FakeInstanceGroup(cluster_id=cluster_id, **instance_group)
self.instance_groups[group.id] = group self.instance_groups[group.id] = group
cluster.add_instance_group(group) cluster.add_instance_group(group)
result_groups.append(group) result_groups.append(group)
@ -465,6 +501,25 @@ class ElasticMapReduceBackend(BaseBackend):
clusters.append(cluster) clusters.append(cluster)
return clusters return clusters
def put_auto_scaling_policy(self, instance_group_id, auto_scaling_policy):
instance_groups = self.get_instance_groups(
instance_group_ids=[instance_group_id]
)
if len(instance_groups) == 0:
return None
instance_group = instance_groups[0]
instance_group.auto_scaling_policy = auto_scaling_policy
return instance_group
def remove_auto_scaling_policy(self, cluster_id, instance_group_id):
instance_groups = self.get_instance_groups(
instance_group_ids=[instance_group_id]
)
if len(instance_groups) == 0:
return None
instance_group = instance_groups[0]
instance_group.auto_scaling_policy = None
emr_backends = {} emr_backends = {}
for region in Session().get_available_regions("emr"): for region in Session().get_available_regions("emr"):

View File

@ -13,7 +13,7 @@ from moto.core.responses import xml_to_json_response
from moto.core.utils import tags_from_query_string from moto.core.utils import tags_from_query_string
from .exceptions import EmrError from .exceptions import EmrError
from .models import emr_backends from .models import emr_backends
from .utils import steps_from_query_string from .utils import steps_from_query_string, Unflattener
def generate_boto3_response(operation): def generate_boto3_response(operation):
@ -76,6 +76,8 @@ class ElasticMapReduceResponse(BaseResponse):
item["instance_count"] = int(item["instance_count"]) item["instance_count"] = int(item["instance_count"])
# Adding support to EbsConfiguration # Adding support to EbsConfiguration
self._parse_ebs_configuration(item) self._parse_ebs_configuration(item)
# Adding support for auto_scaling_policy
Unflattener.unflatten_complex_params(item, "auto_scaling_policy")
instance_groups = self.backend.add_instance_groups(jobflow_id, instance_groups) instance_groups = self.backend.add_instance_groups(jobflow_id, instance_groups)
template = self.response_template(ADD_INSTANCE_GROUPS_TEMPLATE) template = self.response_template(ADD_INSTANCE_GROUPS_TEMPLATE)
return template.render(instance_groups=instance_groups) return template.render(instance_groups=instance_groups)
@ -329,6 +331,8 @@ class ElasticMapReduceResponse(BaseResponse):
ig["instance_count"] = int(ig["instance_count"]) ig["instance_count"] = int(ig["instance_count"])
# Adding support to EbsConfiguration # Adding support to EbsConfiguration
self._parse_ebs_configuration(ig) self._parse_ebs_configuration(ig)
# Adding support for auto_scaling_policy
Unflattener.unflatten_complex_params(ig, "auto_scaling_policy")
self.backend.add_instance_groups(cluster.id, instance_groups) self.backend.add_instance_groups(cluster.id, instance_groups)
tags = self._get_list_prefix("Tags.member") tags = self._get_list_prefix("Tags.member")
@ -442,6 +446,25 @@ class ElasticMapReduceResponse(BaseResponse):
template = self.response_template(TERMINATE_JOB_FLOWS_TEMPLATE) template = self.response_template(TERMINATE_JOB_FLOWS_TEMPLATE)
return template.render() return template.render()
@generate_boto3_response("PutAutoScalingPolicy")
def put_auto_scaling_policy(self):
cluster_id = self._get_param("ClusterId")
instance_group_id = self._get_param("InstanceGroupId")
auto_scaling_policy = self._get_param("AutoScalingPolicy")
instance_group = self.backend.put_auto_scaling_policy(
instance_group_id, auto_scaling_policy
)
template = self.response_template(PUT_AUTO_SCALING_POLICY)
return template.render(cluster_id=cluster_id, instance_group=instance_group)
@generate_boto3_response("RemoveAutoScalingPolicy")
def remove_auto_scaling_policy(self):
cluster_id = self._get_param("ClusterId")
instance_group_id = self._get_param("InstanceGroupId")
instance_group = self.backend.put_auto_scaling_policy(instance_group_id, None)
template = self.response_template(REMOVE_AUTO_SCALING_POLICY)
return template.render(cluster_id=cluster_id, instance_group=instance_group)
ADD_INSTANCE_GROUPS_TEMPLATE = """<AddInstanceGroupsResponse xmlns="http://elasticmapreduce.amazonaws.com/doc/2009-03-31"> ADD_INSTANCE_GROUPS_TEMPLATE = """<AddInstanceGroupsResponse xmlns="http://elasticmapreduce.amazonaws.com/doc/2009-03-31">
<AddInstanceGroupsResult> <AddInstanceGroupsResult>
@ -854,6 +877,107 @@ LIST_INSTANCE_GROUPS_TEMPLATE = """<ListInstanceGroupsResponse xmlns="http://ela
{% endfor %} {% endfor %}
</EbsBlockDevices> </EbsBlockDevices>
{% endif %} {% endif %}
{% if instance_group.auto_scaling_policy is not none %}
<AutoScalingPolicy>
{% if instance_group.auto_scaling_policy.constraints is not none %}
<Constraints>
{% if instance_group.auto_scaling_policy.constraints.min_capacity is not none %}
<MinCapacity>{{instance_group.auto_scaling_policy.constraints.min_capacity}}</MinCapacity>
{% endif %}
{% if instance_group.auto_scaling_policy.constraints.max_capacity is not none %}
<MaxCapacity>{{instance_group.auto_scaling_policy.constraints.max_capacity}}</MaxCapacity>
{% endif %}
</Constraints>
{% endif %}
{% if instance_group.auto_scaling_policy.rules is not none %}
<Rules>
{% for rule in instance_group.auto_scaling_policy.rules %}
<member>
{% if 'name' in rule %}
<Name>{{rule['name']}}</Name>
{% endif %}
{% if 'description' in rule %}
<Description>{{rule['description']}}</Description>
{% endif %}
{% if 'action' in rule %}
<Action>
{% if 'market' in rule['action'] %}
<Market>{{rule['action']['market']}}</Market>
{% endif %}
{% if 'simple_scaling_policy_configuration' in rule['action'] %}
<SimpleScalingPolicyConfiguration>
{% if 'adjustment_type' in rule['action']['simple_scaling_policy_configuration'] %}
<AdjustmentType>{{rule['action']['simple_scaling_policy_configuration']['adjustment_type']}}</AdjustmentType>
{% endif %}
{% if 'scaling_adjustment' in rule['action']['simple_scaling_policy_configuration'] %}
<ScalingAdjustment>{{rule['action']['simple_scaling_policy_configuration']['scaling_adjustment']}}</ScalingAdjustment>
{% endif %}
{% if 'cool_down' in rule['action']['simple_scaling_policy_configuration'] %}
<CoolDown>{{rule['action']['simple_scaling_policy_configuration']['cool_down']}}</CoolDown>
{% endif %}
</SimpleScalingPolicyConfiguration>
{% endif %}
</Action>
{% endif %}
{% if 'trigger' in rule %}
<Trigger>
{% if 'cloud_watch_alarm_definition' in rule['trigger'] %}
<CloudWatchAlarmDefinition>
{% if 'comparison_operator' in rule['trigger']['cloud_watch_alarm_definition'] %}
<ComparisonOperator>{{rule['trigger']['cloud_watch_alarm_definition']['comparison_operator']}}</ComparisonOperator>
{% endif %}
{% if 'evaluation_periods' in rule['trigger']['cloud_watch_alarm_definition'] %}
<EvaluationPeriods>{{rule['trigger']['cloud_watch_alarm_definition']['evaluation_periods']}}</EvaluationPeriods>
{% endif %}
{% if 'metric_name' in rule['trigger']['cloud_watch_alarm_definition'] %}
<MetricName>{{rule['trigger']['cloud_watch_alarm_definition']['metric_name']}}</MetricName>
{% endif %}
{% if 'namespace' in rule['trigger']['cloud_watch_alarm_definition'] %}
<Namespace>{{rule['trigger']['cloud_watch_alarm_definition']['namespace']}}</Namespace>
{% endif %}
{% if 'period' in rule['trigger']['cloud_watch_alarm_definition'] %}
<Period>{{rule['trigger']['cloud_watch_alarm_definition']['period']}}</Period>
{% endif %}
{% if 'statistic' in rule['trigger']['cloud_watch_alarm_definition'] %}
<Statistic>{{rule['trigger']['cloud_watch_alarm_definition']['statistic']}}</Statistic>
{% endif %}
{% if 'threshold' in rule['trigger']['cloud_watch_alarm_definition'] %}
<Threshold>{{rule['trigger']['cloud_watch_alarm_definition']['threshold']}}</Threshold>
{% endif %}
{% if 'unit' in rule['trigger']['cloud_watch_alarm_definition'] %}
<Unit>{{rule['trigger']['cloud_watch_alarm_definition']['unit']}}</Unit>
{% endif %}
{% if 'dimensions' in rule['trigger']['cloud_watch_alarm_definition'] %}
<Dimensions>
{% for dimension in rule['trigger']['cloud_watch_alarm_definition']['dimensions'] %}
<member>
{% if 'key' in dimension %}
<Key>{{dimension['key']}}</Key>
{% endif %}
{% if 'value' in dimension %}
<Value>{{dimension['value']}}</Value>
{% endif %}
</member>
{% endfor %}
</Dimensions>
{% endif %}
</CloudWatchAlarmDefinition>
{% endif %}
</Trigger>
{% endif %}
</member>
{% endfor %}
</Rules>
{% endif %}
{% if instance_group.auto_scaling_policy.status is not none %}
<Status>
{% if 'state' in instance_group.auto_scaling_policy.status %}
<State>{{instance_group.auto_scaling_policy.status['state']}}</State>
{% endif %}
</Status>
{% endif %}
</AutoScalingPolicy>
{% endif %}
{% if instance_group.ebs_optimized is not none %} {% if instance_group.ebs_optimized is not none %}
<EbsOptimized>{{ instance_group.ebs_optimized }}</EbsOptimized> <EbsOptimized>{{ instance_group.ebs_optimized }}</EbsOptimized>
{% endif %} {% endif %}
@ -989,3 +1113,120 @@ TERMINATE_JOB_FLOWS_TEMPLATE = """<TerminateJobFlowsResponse xmlns="http://elast
<RequestId>2690d7eb-ed86-11dd-9877-6fad448a8419</RequestId> <RequestId>2690d7eb-ed86-11dd-9877-6fad448a8419</RequestId>
</ResponseMetadata> </ResponseMetadata>
</TerminateJobFlowsResponse>""" </TerminateJobFlowsResponse>"""
PUT_AUTO_SCALING_POLICY = """<PutAutoScalingPolicyResponse xmlns="http://elasticmapreduce.amazonaws.com/doc/2009-03-31">
<PutAutoScalingPolicyResult>
<ClusterId>{{cluster_id}}</ClusterId>
<InstanceGroupId>{{instance_group.id}}</InstanceGroupId>
{% if instance_group.auto_scaling_policy is not none %}
<AutoScalingPolicy>
{% if instance_group.auto_scaling_policy.constraints is not none %}
<Constraints>
{% if instance_group.auto_scaling_policy.constraints.min_capacity is not none %}
<MinCapacity>{{instance_group.auto_scaling_policy.constraints.min_capacity}}</MinCapacity>
{% endif %}
{% if instance_group.auto_scaling_policy.constraints.max_capacity is not none %}
<MaxCapacity>{{instance_group.auto_scaling_policy.constraints.max_capacity}}</MaxCapacity>
{% endif %}
</Constraints>
{% endif %}
{% if instance_group.auto_scaling_policy.rules is not none %}
<Rules>
{% for rule in instance_group.auto_scaling_policy.rules %}
<member>
{% if 'name' in rule %}
<Name>{{rule['name']}}</Name>
{% endif %}
{% if 'description' in rule %}
<Description>{{rule['description']}}</Description>
{% endif %}
{% if 'action' in rule %}
<Action>
{% if 'market' in rule['action'] %}
<Market>{{rule['action']['market']}}</Market>
{% endif %}
{% if 'simple_scaling_policy_configuration' in rule['action'] %}
<SimpleScalingPolicyConfiguration>
{% if 'adjustment_type' in rule['action']['simple_scaling_policy_configuration'] %}
<AdjustmentType>{{rule['action']['simple_scaling_policy_configuration']['adjustment_type']}}</AdjustmentType>
{% endif %}
{% if 'scaling_adjustment' in rule['action']['simple_scaling_policy_configuration'] %}
<ScalingAdjustment>{{rule['action']['simple_scaling_policy_configuration']['scaling_adjustment']}}</ScalingAdjustment>
{% endif %}
{% if 'cool_down' in rule['action']['simple_scaling_policy_configuration'] %}
<CoolDown>{{rule['action']['simple_scaling_policy_configuration']['cool_down']}}</CoolDown>
{% endif %}
</SimpleScalingPolicyConfiguration>
{% endif %}
</Action>
{% endif %}
{% if 'trigger' in rule %}
<Trigger>
{% if 'cloud_watch_alarm_definition' in rule['trigger'] %}
<CloudWatchAlarmDefinition>
{% if 'comparison_operator' in rule['trigger']['cloud_watch_alarm_definition'] %}
<ComparisonOperator>{{rule['trigger']['cloud_watch_alarm_definition']['comparison_operator']}}</ComparisonOperator>
{% endif %}
{% if 'evaluation_periods' in rule['trigger']['cloud_watch_alarm_definition'] %}
<EvaluationPeriods>{{rule['trigger']['cloud_watch_alarm_definition']['evaluation_periods']}}</EvaluationPeriods>
{% endif %}
{% if 'metric_name' in rule['trigger']['cloud_watch_alarm_definition'] %}
<MetricName>{{rule['trigger']['cloud_watch_alarm_definition']['metric_name']}}</MetricName>
{% endif %}
{% if 'namespace' in rule['trigger']['cloud_watch_alarm_definition'] %}
<Namespace>{{rule['trigger']['cloud_watch_alarm_definition']['namespace']}}</Namespace>
{% endif %}
{% if 'period' in rule['trigger']['cloud_watch_alarm_definition'] %}
<Period>{{rule['trigger']['cloud_watch_alarm_definition']['period']}}</Period>
{% endif %}
{% if 'statistic' in rule['trigger']['cloud_watch_alarm_definition'] %}
<Statistic>{{rule['trigger']['cloud_watch_alarm_definition']['statistic']}}</Statistic>
{% endif %}
{% if 'threshold' in rule['trigger']['cloud_watch_alarm_definition'] %}
<Threshold>{{rule['trigger']['cloud_watch_alarm_definition']['threshold']}}</Threshold>
{% endif %}
{% if 'unit' in rule['trigger']['cloud_watch_alarm_definition'] %}
<Unit>{{rule['trigger']['cloud_watch_alarm_definition']['unit']}}</Unit>
{% endif %}
{% if 'dimensions' in rule['trigger']['cloud_watch_alarm_definition'] %}
<Dimensions>
{% for dimension in rule['trigger']['cloud_watch_alarm_definition']['dimensions'] %}
<member>
{% if 'key' in dimension %}
<Key>{{dimension['key']}}</Key>
{% endif %}
{% if 'value' in dimension %}
<Value>{{dimension['value']}}</Value>
{% endif %}
</member>
{% endfor %}
</Dimensions>
{% endif %}
</CloudWatchAlarmDefinition>
{% endif %}
</Trigger>
{% endif %}
</member>
{% endfor %}
</Rules>
{% endif %}
{% if instance_group.auto_scaling_policy.status is not none %}
<Status>
{% if 'state' in instance_group.auto_scaling_policy.status %}
<State>{{instance_group.auto_scaling_policy.status['state']}}</State>
{% endif %}
</Status>
{% endif %}
</AutoScalingPolicy>
{% endif %}
</PutAutoScalingPolicyResult>
<ResponseMetadata>
<RequestId>d47379d9-b505-49af-9335-a68950d82535</RequestId>
</ResponseMetadata>
</PutAutoScalingPolicyResponse>"""
REMOVE_AUTO_SCALING_POLICY = """<RemoveAutoScalingPolicyResponse xmlns="http://elasticmapreduce.amazonaws.com/doc/2009-03-31">
<ResponseMetadata>
<RequestId>c04a1042-5340-4c0a-a7b5-7779725ce4f7</RequestId>
</ResponseMetadata>
</RemoveAutoScalingPolicyResponse>"""

View File

@ -1,6 +1,7 @@
from __future__ import unicode_literals from __future__ import unicode_literals
import random import random
import string import string
from moto.core.utils import camelcase_to_underscores
import six import six
@ -37,3 +38,109 @@ def steps_from_query_string(querystring_dict):
idx += 1 idx += 1
steps.append(step) steps.append(step)
return steps return steps
class Unflattener:
@staticmethod
def unflatten_complex_params(input_dict, param_name):
""" Function to unflatten (portions of) dicts with complex keys. The moto request parser flattens the incoming
request bodies, which is generally helpful, but for nested dicts/lists can result in a hard-to-manage
parameter exposion. This function allows one to selectively unflatten a set of dict keys, replacing them
with a deep dist/list structure named identically to the root component in the complex name.
Complex keys are composed of multiple components
separated by periods. Components may be prefixed with _, which is stripped. Lists indexes are represented
with two components, 'member' and the index number. """
items_to_process = {}
for k in input_dict.keys():
if k.startswith(param_name):
items_to_process[k] = input_dict[k]
if len(items_to_process) == 0:
return
for k in items_to_process.keys():
del input_dict[k]
for k in items_to_process.keys():
Unflattener._set_deep(k, input_dict, items_to_process[k])
@staticmethod
def _set_deep(complex_key, container, value):
keys = complex_key.split(".")
keys.reverse()
while len(keys) > 0:
if len(keys) == 1:
key = keys.pop().strip("_")
Unflattener._add_to_container(container, key, value)
else:
key = keys.pop().strip("_")
if keys[-1] == "member":
keys.pop()
if not Unflattener._key_in_container(container, key):
container = Unflattener._add_to_container(container, key, [])
else:
container = Unflattener._get_child(container, key)
else:
if not Unflattener._key_in_container(container, key):
container = Unflattener._add_to_container(container, key, {})
else:
container = Unflattener._get_child(container, key)
@staticmethod
def _add_to_container(container, key, value):
if type(container) is dict:
container[key] = value
elif type(container) is list:
i = int(key)
while len(container) < i:
container.append(None)
container[i - 1] = value
return value
@staticmethod
def _get_child(container, key):
if type(container) is dict:
return container[key]
elif type(container) is list:
i = int(key)
return container[i - 1]
@staticmethod
def _key_in_container(container, key):
if type(container) is dict:
return key in container
elif type(container) is list:
i = int(key)
return len(container) >= i
class CamelToUnderscoresWalker:
"""A class to convert the keys in dict/list hierarchical data structures from CamelCase to snake_case (underscores)"""
@staticmethod
def parse(x):
if isinstance(x, dict):
return CamelToUnderscoresWalker.parse_dict(x)
elif isinstance(x, list):
return CamelToUnderscoresWalker.parse_list(x)
else:
return CamelToUnderscoresWalker.parse_scalar(x)
@staticmethod
def parse_dict(x):
temp = {}
for key in x.keys():
temp[camelcase_to_underscores(key)] = CamelToUnderscoresWalker.parse(x[key])
return temp
@staticmethod
def parse_list(x):
temp = []
for i in x:
temp.append(CamelToUnderscoresWalker.parse(i))
return temp
@staticmethod
def parse_scalar(x):
return x

View File

@ -476,6 +476,118 @@ def test_run_job_flow_with_instance_groups():
_do_assertion_ebs_configuration(x, y) _do_assertion_ebs_configuration(x, y)
auto_scaling_policy = {
"Constraints": {"MinCapacity": 2, "MaxCapacity": 10},
"Rules": [
{
"Name": "Default-scale-out",
"Description": "Replicates the default scale-out rule in the console for YARN memory.",
"Action": {
"SimpleScalingPolicyConfiguration": {
"AdjustmentType": "CHANGE_IN_CAPACITY",
"ScalingAdjustment": 1,
"CoolDown": 300,
}
},
"Trigger": {
"CloudWatchAlarmDefinition": {
"ComparisonOperator": "LESS_THAN",
"EvaluationPeriods": 1,
"MetricName": "YARNMemoryAvailablePercentage",
"Namespace": "AWS/ElasticMapReduce",
"Period": 300,
"Threshold": 15.0,
"Statistic": "AVERAGE",
"Unit": "PERCENT",
"Dimensions": [{"Key": "JobFlowId", "Value": "${emr.clusterId}"}],
}
},
}
],
}
@mock_emr
def test_run_job_flow_with_instance_groups_with_autoscaling():
input_groups = dict((g["Name"], g) for g in input_instance_groups)
input_groups["core"]["AutoScalingPolicy"] = auto_scaling_policy
input_groups["task-1"]["AutoScalingPolicy"] = auto_scaling_policy
client = boto3.client("emr", region_name="us-east-1")
args = deepcopy(run_job_flow_args)
args["Instances"] = {"InstanceGroups": input_instance_groups}
cluster_id = client.run_job_flow(**args)["JobFlowId"]
groups = client.list_instance_groups(ClusterId=cluster_id)["InstanceGroups"]
for x in groups:
y = deepcopy(input_groups[x["Name"]])
if "AutoScalingPolicy" in y:
x["AutoScalingPolicy"]["Status"]["State"].should.equal("ATTACHED")
returned_policy = deepcopy(x["AutoScalingPolicy"])
auto_scaling_policy_with_cluster_id = _patch_cluster_id_placeholder_in_autoscaling_policy(
y["AutoScalingPolicy"], cluster_id
)
del returned_policy["Status"]
returned_policy.should.equal(auto_scaling_policy_with_cluster_id)
@mock_emr
def test_put_remove_auto_scaling_policy():
input_groups = dict((g["Name"], g) for g in input_instance_groups)
client = boto3.client("emr", region_name="us-east-1")
args = deepcopy(run_job_flow_args)
args["Instances"] = {"InstanceGroups": input_instance_groups}
cluster_id = client.run_job_flow(**args)["JobFlowId"]
core_instance_group = [
ig
for ig in client.list_instance_groups(ClusterId=cluster_id)["InstanceGroups"]
if ig["InstanceGroupType"] == "CORE"
][0]
resp = client.put_auto_scaling_policy(
ClusterId=cluster_id,
InstanceGroupId=core_instance_group["Id"],
AutoScalingPolicy=auto_scaling_policy,
)
auto_scaling_policy_with_cluster_id = _patch_cluster_id_placeholder_in_autoscaling_policy(
auto_scaling_policy, cluster_id
)
del resp["AutoScalingPolicy"]["Status"]
resp["AutoScalingPolicy"].should.equal(auto_scaling_policy_with_cluster_id)
core_instance_group = [
ig
for ig in client.list_instance_groups(ClusterId=cluster_id)["InstanceGroups"]
if ig["InstanceGroupType"] == "CORE"
][0]
("AutoScalingPolicy" in core_instance_group).should.equal(True)
client.remove_auto_scaling_policy(
ClusterId=cluster_id, InstanceGroupId=core_instance_group["Id"]
)
core_instance_group = [
ig
for ig in client.list_instance_groups(ClusterId=cluster_id)["InstanceGroups"]
if ig["InstanceGroupType"] == "CORE"
][0]
("AutoScalingPolicy" not in core_instance_group).should.equal(True)
def _patch_cluster_id_placeholder_in_autoscaling_policy(
auto_scaling_policy, cluster_id
):
policy_copy = deepcopy(auto_scaling_policy)
for rule in policy_copy["Rules"]:
for dimension in rule["Trigger"]["CloudWatchAlarmDefinition"]["Dimensions"]:
dimension["Value"] = cluster_id
return policy_copy
@mock_emr @mock_emr
def test_run_job_flow_with_custom_ami(): def test_run_job_flow_with_custom_ami():
client = boto3.client("emr", region_name="us-east-1") client = boto3.client("emr", region_name="us-east-1")
@ -619,8 +731,11 @@ def test_instance_groups():
jf = client.describe_job_flows(JobFlowIds=[cluster_id])["JobFlows"][0] jf = client.describe_job_flows(JobFlowIds=[cluster_id])["JobFlows"][0]
base_instance_count = jf["Instances"]["InstanceCount"] base_instance_count = jf["Instances"]["InstanceCount"]
instance_groups_to_add = deepcopy(input_instance_groups[2:])
instance_groups_to_add[0]["AutoScalingPolicy"] = auto_scaling_policy
instance_groups_to_add[1]["AutoScalingPolicy"] = auto_scaling_policy
client.add_instance_groups( client.add_instance_groups(
JobFlowId=cluster_id, InstanceGroups=input_instance_groups[2:] JobFlowId=cluster_id, InstanceGroups=instance_groups_to_add
) )
jf = client.describe_job_flows(JobFlowIds=[cluster_id])["JobFlows"][0] jf = client.describe_job_flows(JobFlowIds=[cluster_id])["JobFlows"][0]
@ -629,8 +744,8 @@ def test_instance_groups():
) )
for x in jf["Instances"]["InstanceGroups"]: for x in jf["Instances"]["InstanceGroups"]:
y = input_groups[x["Name"]] y = input_groups[x["Name"]]
if hasattr(y, "BidPrice"): if "BidPrice" in y:
x["BidPrice"].should.equal("BidPrice") x["BidPrice"].should.equal(y["BidPrice"])
x["CreationDateTime"].should.be.a("datetime.datetime") x["CreationDateTime"].should.be.a("datetime.datetime")
# x['EndDateTime'].should.be.a('datetime.datetime') # x['EndDateTime'].should.be.a('datetime.datetime')
x.should.have.key("InstanceGroupId") x.should.have.key("InstanceGroupId")
@ -647,9 +762,18 @@ def test_instance_groups():
groups = client.list_instance_groups(ClusterId=cluster_id)["InstanceGroups"] groups = client.list_instance_groups(ClusterId=cluster_id)["InstanceGroups"]
for x in groups: for x in groups:
y = input_groups[x["Name"]] y = deepcopy(input_groups[x["Name"]])
if hasattr(y, "BidPrice"): if "BidPrice" in y:
x["BidPrice"].should.equal("BidPrice") x["BidPrice"].should.equal(y["BidPrice"])
if "AutoScalingPolicy" in y:
x["AutoScalingPolicy"]["Status"]["State"].should.equal("ATTACHED")
returned_policy = dict(x["AutoScalingPolicy"])
del returned_policy["Status"]
for dimension in y["AutoScalingPolicy"]["Rules"]["Trigger"][
"CloudWatchAlarmDefinition"
]["Dimensions"]:
dimension["Value"] = cluster_id
returned_policy.should.equal(y["AutoScalingPolicy"])
if "EbsConfiguration" in y: if "EbsConfiguration" in y:
_do_assertion_ebs_configuration(x, y) _do_assertion_ebs_configuration(x, y)
# Configurations # Configurations