diff --git a/moto/emr/models.py b/moto/emr/models.py index 72c588166..63aadf105 100644 --- a/moto/emr/models.py +++ b/moto/emr/models.py @@ -7,7 +7,12 @@ from boto3 import Session from dateutil.parser import parse as dtparse from moto.core import BaseBackend, BaseModel from moto.emr.exceptions import EmrError -from .utils import random_instance_group_id, random_cluster_id, random_step_id +from .utils import ( + random_instance_group_id, + random_cluster_id, + random_step_id, + CamelToUnderscoresWalker, +) class FakeApplication(BaseModel): @@ -28,6 +33,7 @@ class FakeBootstrapAction(BaseModel): class FakeInstanceGroup(BaseModel): def __init__( self, + cluster_id, instance_count, instance_role, instance_type, @@ -36,8 +42,10 @@ class FakeInstanceGroup(BaseModel): id=None, bid_price=None, ebs_configuration=None, + auto_scaling_policy=None, ): self.id = id or random_instance_group_id() + self.cluster_id = cluster_id self.bid_price = bid_price self.market = market @@ -53,7 +61,7 @@ class FakeInstanceGroup(BaseModel): self.role = instance_role self.type = instance_type self.ebs_configuration = ebs_configuration - + self.auto_scaling_policy = auto_scaling_policy self.creation_datetime = datetime.now(pytz.utc) self.start_datetime = datetime.now(pytz.utc) self.ready_datetime = datetime.now(pytz.utc) @@ -63,6 +71,34 @@ class FakeInstanceGroup(BaseModel): def set_instance_count(self, instance_count): self.num_instances = instance_count + @property + def auto_scaling_policy(self): + return self._auto_scaling_policy + + @auto_scaling_policy.setter + def auto_scaling_policy(self, value): + if value is None: + self._auto_scaling_policy = value + return + self._auto_scaling_policy = CamelToUnderscoresWalker.parse(value) + self._auto_scaling_policy["status"] = {"state": "ATTACHED"} + # Transform common ${emr.clusterId} placeholder in any dimensions it occurs in. + if "rules" in self._auto_scaling_policy: + for rule in self._auto_scaling_policy["rules"]: + if ( + "trigger" in rule + and "cloud_watch_alarm_definition" in rule["trigger"] + and "dimensions" in rule["trigger"]["cloud_watch_alarm_definition"] + ): + for dimension in rule["trigger"]["cloud_watch_alarm_definition"][ + "dimensions" + ]: + if ( + "value" in dimension + and dimension["value"] == "${emr.clusterId}" + ): + dimension["value"] = self.cluster_id + class FakeStep(BaseModel): def __init__( @@ -319,7 +355,7 @@ class ElasticMapReduceBackend(BaseBackend): cluster = self.clusters[cluster_id] result_groups = [] for instance_group in instance_groups: - group = FakeInstanceGroup(**instance_group) + group = FakeInstanceGroup(cluster_id=cluster_id, **instance_group) self.instance_groups[group.id] = group cluster.add_instance_group(group) result_groups.append(group) @@ -465,6 +501,25 @@ class ElasticMapReduceBackend(BaseBackend): clusters.append(cluster) return clusters + def put_auto_scaling_policy(self, instance_group_id, auto_scaling_policy): + instance_groups = self.get_instance_groups( + instance_group_ids=[instance_group_id] + ) + if len(instance_groups) == 0: + return None + instance_group = instance_groups[0] + instance_group.auto_scaling_policy = auto_scaling_policy + return instance_group + + def remove_auto_scaling_policy(self, cluster_id, instance_group_id): + instance_groups = self.get_instance_groups( + instance_group_ids=[instance_group_id] + ) + if len(instance_groups) == 0: + return None + instance_group = instance_groups[0] + instance_group.auto_scaling_policy = None + emr_backends = {} for region in Session().get_available_regions("emr"): diff --git a/moto/emr/responses.py b/moto/emr/responses.py index d2b234ced..38a33519c 100644 --- a/moto/emr/responses.py +++ b/moto/emr/responses.py @@ -13,7 +13,7 @@ from moto.core.responses import xml_to_json_response from moto.core.utils import tags_from_query_string from .exceptions import EmrError from .models import emr_backends -from .utils import steps_from_query_string +from .utils import steps_from_query_string, Unflattener def generate_boto3_response(operation): @@ -76,6 +76,8 @@ class ElasticMapReduceResponse(BaseResponse): item["instance_count"] = int(item["instance_count"]) # Adding support to EbsConfiguration self._parse_ebs_configuration(item) + # Adding support for auto_scaling_policy + Unflattener.unflatten_complex_params(item, "auto_scaling_policy") instance_groups = self.backend.add_instance_groups(jobflow_id, instance_groups) template = self.response_template(ADD_INSTANCE_GROUPS_TEMPLATE) return template.render(instance_groups=instance_groups) @@ -329,6 +331,8 @@ class ElasticMapReduceResponse(BaseResponse): ig["instance_count"] = int(ig["instance_count"]) # Adding support to EbsConfiguration self._parse_ebs_configuration(ig) + # Adding support for auto_scaling_policy + Unflattener.unflatten_complex_params(ig, "auto_scaling_policy") self.backend.add_instance_groups(cluster.id, instance_groups) tags = self._get_list_prefix("Tags.member") @@ -442,6 +446,25 @@ class ElasticMapReduceResponse(BaseResponse): template = self.response_template(TERMINATE_JOB_FLOWS_TEMPLATE) return template.render() + @generate_boto3_response("PutAutoScalingPolicy") + def put_auto_scaling_policy(self): + cluster_id = self._get_param("ClusterId") + instance_group_id = self._get_param("InstanceGroupId") + auto_scaling_policy = self._get_param("AutoScalingPolicy") + instance_group = self.backend.put_auto_scaling_policy( + instance_group_id, auto_scaling_policy + ) + template = self.response_template(PUT_AUTO_SCALING_POLICY) + return template.render(cluster_id=cluster_id, instance_group=instance_group) + + @generate_boto3_response("RemoveAutoScalingPolicy") + def remove_auto_scaling_policy(self): + cluster_id = self._get_param("ClusterId") + instance_group_id = self._get_param("InstanceGroupId") + instance_group = self.backend.put_auto_scaling_policy(instance_group_id, None) + template = self.response_template(REMOVE_AUTO_SCALING_POLICY) + return template.render(cluster_id=cluster_id, instance_group=instance_group) + ADD_INSTANCE_GROUPS_TEMPLATE = """ @@ -854,6 +877,107 @@ LIST_INSTANCE_GROUPS_TEMPLATE = """ + + {{cluster_id}} + {{instance_group.id}} + {% if instance_group.auto_scaling_policy is not none %} + + {% if instance_group.auto_scaling_policy.constraints is not none %} + + {% if instance_group.auto_scaling_policy.constraints.min_capacity is not none %} + {{instance_group.auto_scaling_policy.constraints.min_capacity}} + {% endif %} + {% if instance_group.auto_scaling_policy.constraints.max_capacity is not none %} + {{instance_group.auto_scaling_policy.constraints.max_capacity}} + {% endif %} + + {% endif %} + {% if instance_group.auto_scaling_policy.rules is not none %} + + {% for rule in instance_group.auto_scaling_policy.rules %} + + {% if 'name' in rule %} + {{rule['name']}} + {% endif %} + {% if 'description' in rule %} + {{rule['description']}} + {% endif %} + {% if 'action' in rule %} + + {% if 'market' in rule['action'] %} + {{rule['action']['market']}} + {% endif %} + {% if 'simple_scaling_policy_configuration' in rule['action'] %} + + {% if 'adjustment_type' in rule['action']['simple_scaling_policy_configuration'] %} + {{rule['action']['simple_scaling_policy_configuration']['adjustment_type']}} + {% endif %} + {% if 'scaling_adjustment' in rule['action']['simple_scaling_policy_configuration'] %} + {{rule['action']['simple_scaling_policy_configuration']['scaling_adjustment']}} + {% endif %} + {% if 'cool_down' in rule['action']['simple_scaling_policy_configuration'] %} + {{rule['action']['simple_scaling_policy_configuration']['cool_down']}} + {% endif %} + + {% endif %} + + {% endif %} + {% if 'trigger' in rule %} + + {% if 'cloud_watch_alarm_definition' in rule['trigger'] %} + + {% if 'comparison_operator' in rule['trigger']['cloud_watch_alarm_definition'] %} + {{rule['trigger']['cloud_watch_alarm_definition']['comparison_operator']}} + {% endif %} + {% if 'evaluation_periods' in rule['trigger']['cloud_watch_alarm_definition'] %} + {{rule['trigger']['cloud_watch_alarm_definition']['evaluation_periods']}} + {% endif %} + {% if 'metric_name' in rule['trigger']['cloud_watch_alarm_definition'] %} + {{rule['trigger']['cloud_watch_alarm_definition']['metric_name']}} + {% endif %} + {% if 'namespace' in rule['trigger']['cloud_watch_alarm_definition'] %} + {{rule['trigger']['cloud_watch_alarm_definition']['namespace']}} + {% endif %} + {% if 'period' in rule['trigger']['cloud_watch_alarm_definition'] %} + {{rule['trigger']['cloud_watch_alarm_definition']['period']}} + {% endif %} + {% if 'statistic' in rule['trigger']['cloud_watch_alarm_definition'] %} + {{rule['trigger']['cloud_watch_alarm_definition']['statistic']}} + {% endif %} + {% if 'threshold' in rule['trigger']['cloud_watch_alarm_definition'] %} + {{rule['trigger']['cloud_watch_alarm_definition']['threshold']}} + {% endif %} + {% if 'unit' in rule['trigger']['cloud_watch_alarm_definition'] %} + {{rule['trigger']['cloud_watch_alarm_definition']['unit']}} + {% endif %} + {% if 'dimensions' in rule['trigger']['cloud_watch_alarm_definition'] %} + + {% for dimension in rule['trigger']['cloud_watch_alarm_definition']['dimensions'] %} + + {% if 'key' in dimension %} + {{dimension['key']}} + {% endif %} + {% if 'value' in dimension %} + {{dimension['value']}} + {% endif %} + + {% endfor %} + + {% endif %} + + {% endif %} + + {% endif %} + + {% endfor %} + + {% endif %} + {% if instance_group.auto_scaling_policy.status is not none %} + + {% if 'state' in instance_group.auto_scaling_policy.status %} + {{instance_group.auto_scaling_policy.status['state']}} + {% endif %} + + {% endif %} + + {% endif %} + + + d47379d9-b505-49af-9335-a68950d82535 + +""" + +REMOVE_AUTO_SCALING_POLICY = """ + + c04a1042-5340-4c0a-a7b5-7779725ce4f7 + +""" diff --git a/moto/emr/utils.py b/moto/emr/utils.py index fb33214c8..4d9da8434 100644 --- a/moto/emr/utils.py +++ b/moto/emr/utils.py @@ -1,6 +1,7 @@ from __future__ import unicode_literals import random import string +from moto.core.utils import camelcase_to_underscores import six @@ -37,3 +38,109 @@ def steps_from_query_string(querystring_dict): idx += 1 steps.append(step) return steps + + +class Unflattener: + @staticmethod + def unflatten_complex_params(input_dict, param_name): + """ Function to unflatten (portions of) dicts with complex keys. The moto request parser flattens the incoming + request bodies, which is generally helpful, but for nested dicts/lists can result in a hard-to-manage + parameter exposion. This function allows one to selectively unflatten a set of dict keys, replacing them + with a deep dist/list structure named identically to the root component in the complex name. + + Complex keys are composed of multiple components + separated by periods. Components may be prefixed with _, which is stripped. Lists indexes are represented + with two components, 'member' and the index number. """ + items_to_process = {} + for k in input_dict.keys(): + if k.startswith(param_name): + items_to_process[k] = input_dict[k] + if len(items_to_process) == 0: + return + + for k in items_to_process.keys(): + del input_dict[k] + + for k in items_to_process.keys(): + Unflattener._set_deep(k, input_dict, items_to_process[k]) + + @staticmethod + def _set_deep(complex_key, container, value): + keys = complex_key.split(".") + keys.reverse() + + while len(keys) > 0: + if len(keys) == 1: + key = keys.pop().strip("_") + Unflattener._add_to_container(container, key, value) + else: + key = keys.pop().strip("_") + if keys[-1] == "member": + keys.pop() + if not Unflattener._key_in_container(container, key): + container = Unflattener._add_to_container(container, key, []) + else: + container = Unflattener._get_child(container, key) + else: + if not Unflattener._key_in_container(container, key): + container = Unflattener._add_to_container(container, key, {}) + else: + container = Unflattener._get_child(container, key) + + @staticmethod + def _add_to_container(container, key, value): + if type(container) is dict: + container[key] = value + elif type(container) is list: + i = int(key) + while len(container) < i: + container.append(None) + container[i - 1] = value + return value + + @staticmethod + def _get_child(container, key): + if type(container) is dict: + return container[key] + elif type(container) is list: + i = int(key) + return container[i - 1] + + @staticmethod + def _key_in_container(container, key): + if type(container) is dict: + return key in container + elif type(container) is list: + i = int(key) + return len(container) >= i + + +class CamelToUnderscoresWalker: + """A class to convert the keys in dict/list hierarchical data structures from CamelCase to snake_case (underscores)""" + + @staticmethod + def parse(x): + if isinstance(x, dict): + return CamelToUnderscoresWalker.parse_dict(x) + elif isinstance(x, list): + return CamelToUnderscoresWalker.parse_list(x) + else: + return CamelToUnderscoresWalker.parse_scalar(x) + + @staticmethod + def parse_dict(x): + temp = {} + for key in x.keys(): + temp[camelcase_to_underscores(key)] = CamelToUnderscoresWalker.parse(x[key]) + return temp + + @staticmethod + def parse_list(x): + temp = [] + for i in x: + temp.append(CamelToUnderscoresWalker.parse(i)) + return temp + + @staticmethod + def parse_scalar(x): + return x diff --git a/tests/test_emr/test_emr_boto3.py b/tests/test_emr/test_emr_boto3.py index adfc3fa9c..3f577c69a 100644 --- a/tests/test_emr/test_emr_boto3.py +++ b/tests/test_emr/test_emr_boto3.py @@ -476,6 +476,118 @@ def test_run_job_flow_with_instance_groups(): _do_assertion_ebs_configuration(x, y) +auto_scaling_policy = { + "Constraints": {"MinCapacity": 2, "MaxCapacity": 10}, + "Rules": [ + { + "Name": "Default-scale-out", + "Description": "Replicates the default scale-out rule in the console for YARN memory.", + "Action": { + "SimpleScalingPolicyConfiguration": { + "AdjustmentType": "CHANGE_IN_CAPACITY", + "ScalingAdjustment": 1, + "CoolDown": 300, + } + }, + "Trigger": { + "CloudWatchAlarmDefinition": { + "ComparisonOperator": "LESS_THAN", + "EvaluationPeriods": 1, + "MetricName": "YARNMemoryAvailablePercentage", + "Namespace": "AWS/ElasticMapReduce", + "Period": 300, + "Threshold": 15.0, + "Statistic": "AVERAGE", + "Unit": "PERCENT", + "Dimensions": [{"Key": "JobFlowId", "Value": "${emr.clusterId}"}], + } + }, + } + ], +} + + +@mock_emr +def test_run_job_flow_with_instance_groups_with_autoscaling(): + input_groups = dict((g["Name"], g) for g in input_instance_groups) + + input_groups["core"]["AutoScalingPolicy"] = auto_scaling_policy + input_groups["task-1"]["AutoScalingPolicy"] = auto_scaling_policy + + client = boto3.client("emr", region_name="us-east-1") + args = deepcopy(run_job_flow_args) + args["Instances"] = {"InstanceGroups": input_instance_groups} + cluster_id = client.run_job_flow(**args)["JobFlowId"] + groups = client.list_instance_groups(ClusterId=cluster_id)["InstanceGroups"] + for x in groups: + y = deepcopy(input_groups[x["Name"]]) + if "AutoScalingPolicy" in y: + x["AutoScalingPolicy"]["Status"]["State"].should.equal("ATTACHED") + returned_policy = deepcopy(x["AutoScalingPolicy"]) + auto_scaling_policy_with_cluster_id = _patch_cluster_id_placeholder_in_autoscaling_policy( + y["AutoScalingPolicy"], cluster_id + ) + del returned_policy["Status"] + returned_policy.should.equal(auto_scaling_policy_with_cluster_id) + + +@mock_emr +def test_put_remove_auto_scaling_policy(): + input_groups = dict((g["Name"], g) for g in input_instance_groups) + client = boto3.client("emr", region_name="us-east-1") + args = deepcopy(run_job_flow_args) + args["Instances"] = {"InstanceGroups": input_instance_groups} + cluster_id = client.run_job_flow(**args)["JobFlowId"] + + core_instance_group = [ + ig + for ig in client.list_instance_groups(ClusterId=cluster_id)["InstanceGroups"] + if ig["InstanceGroupType"] == "CORE" + ][0] + + resp = client.put_auto_scaling_policy( + ClusterId=cluster_id, + InstanceGroupId=core_instance_group["Id"], + AutoScalingPolicy=auto_scaling_policy, + ) + + auto_scaling_policy_with_cluster_id = _patch_cluster_id_placeholder_in_autoscaling_policy( + auto_scaling_policy, cluster_id + ) + del resp["AutoScalingPolicy"]["Status"] + resp["AutoScalingPolicy"].should.equal(auto_scaling_policy_with_cluster_id) + + core_instance_group = [ + ig + for ig in client.list_instance_groups(ClusterId=cluster_id)["InstanceGroups"] + if ig["InstanceGroupType"] == "CORE" + ][0] + + ("AutoScalingPolicy" in core_instance_group).should.equal(True) + + client.remove_auto_scaling_policy( + ClusterId=cluster_id, InstanceGroupId=core_instance_group["Id"] + ) + + core_instance_group = [ + ig + for ig in client.list_instance_groups(ClusterId=cluster_id)["InstanceGroups"] + if ig["InstanceGroupType"] == "CORE" + ][0] + + ("AutoScalingPolicy" not in core_instance_group).should.equal(True) + + +def _patch_cluster_id_placeholder_in_autoscaling_policy( + auto_scaling_policy, cluster_id +): + policy_copy = deepcopy(auto_scaling_policy) + for rule in policy_copy["Rules"]: + for dimension in rule["Trigger"]["CloudWatchAlarmDefinition"]["Dimensions"]: + dimension["Value"] = cluster_id + return policy_copy + + @mock_emr def test_run_job_flow_with_custom_ami(): client = boto3.client("emr", region_name="us-east-1") @@ -619,8 +731,11 @@ def test_instance_groups(): jf = client.describe_job_flows(JobFlowIds=[cluster_id])["JobFlows"][0] base_instance_count = jf["Instances"]["InstanceCount"] + instance_groups_to_add = deepcopy(input_instance_groups[2:]) + instance_groups_to_add[0]["AutoScalingPolicy"] = auto_scaling_policy + instance_groups_to_add[1]["AutoScalingPolicy"] = auto_scaling_policy client.add_instance_groups( - JobFlowId=cluster_id, InstanceGroups=input_instance_groups[2:] + JobFlowId=cluster_id, InstanceGroups=instance_groups_to_add ) jf = client.describe_job_flows(JobFlowIds=[cluster_id])["JobFlows"][0] @@ -629,8 +744,8 @@ def test_instance_groups(): ) for x in jf["Instances"]["InstanceGroups"]: y = input_groups[x["Name"]] - if hasattr(y, "BidPrice"): - x["BidPrice"].should.equal("BidPrice") + if "BidPrice" in y: + x["BidPrice"].should.equal(y["BidPrice"]) x["CreationDateTime"].should.be.a("datetime.datetime") # x['EndDateTime'].should.be.a('datetime.datetime') x.should.have.key("InstanceGroupId") @@ -647,9 +762,18 @@ def test_instance_groups(): groups = client.list_instance_groups(ClusterId=cluster_id)["InstanceGroups"] for x in groups: - y = input_groups[x["Name"]] - if hasattr(y, "BidPrice"): - x["BidPrice"].should.equal("BidPrice") + y = deepcopy(input_groups[x["Name"]]) + if "BidPrice" in y: + x["BidPrice"].should.equal(y["BidPrice"]) + if "AutoScalingPolicy" in y: + x["AutoScalingPolicy"]["Status"]["State"].should.equal("ATTACHED") + returned_policy = dict(x["AutoScalingPolicy"]) + del returned_policy["Status"] + for dimension in y["AutoScalingPolicy"]["Rules"]["Trigger"][ + "CloudWatchAlarmDefinition" + ]["Dimensions"]: + dimension["Value"] = cluster_id + returned_policy.should.equal(y["AutoScalingPolicy"]) if "EbsConfiguration" in y: _do_assertion_ebs_configuration(x, y) # Configurations