CloudFormation: Support AWS::EMR::InstanceGroupConfig (#7446)

This commit is contained in:
Bert Blommers 2024-03-08 22:15:19 +00:00 committed by GitHub
parent b13e493823
commit cb60935eea
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 241 additions and 6 deletions

View File

@ -53,7 +53,7 @@ class FakeInstance(BaseModel):
self.instance_fleet_id = instance_fleet_id
class FakeInstanceGroup(BaseModel):
class FakeInstanceGroup(CloudFormationModel):
def __init__(
self,
cluster_id: str,
@ -82,7 +82,7 @@ class FakeInstanceGroup(BaseModel):
self.name = name
self.num_instances = instance_count
self.role = instance_role
self.type = instance_type
self.instance_type = instance_type
self.ebs_configuration = ebs_configuration
self.auto_scaling_policy = auto_scaling_policy
self.creation_datetime = datetime.now(timezone.utc)
@ -122,6 +122,45 @@ class FakeInstanceGroup(BaseModel):
):
dimension["value"] = self.cluster_id
@property
def physical_resource_id(self) -> str:
return self.id
@staticmethod
def cloudformation_type() -> str:
return "AWS::EMR::InstanceGroupConfig"
@classmethod
def create_from_cloudformation_json( # type: ignore[misc]
cls,
resource_name: str,
cloudformation_json: Any,
account_id: str,
region_name: str,
**kwargs: Any,
) -> "FakeInstanceGroup":
properties = cloudformation_json["Properties"]
job_flow_id = properties["JobFlowId"]
ebs_config = properties.get("EbsConfiguration")
if ebs_config:
ebs_config = CamelToUnderscoresWalker.parse_dict(ebs_config)
props = {
"instance_count": properties.get("InstanceCount"),
"instance_role": properties.get("InstanceRole"),
"instance_type": properties.get("InstanceType"),
"market": properties.get("Market"),
"bid_price": properties.get("BidPrice"),
"name": properties.get("Name"),
"auto_scaling_policy": properties.get("AutoScalingPolicy"),
"ebs_configuration": ebs_config,
}
emr_backend: ElasticMapReduceBackend = emr_backends[account_id][region_name]
return emr_backend.add_instance_groups(
cluster_id=job_flow_id, instance_groups=[props]
)[0]
class FakeStep(BaseModel):
def __init__(
@ -292,11 +331,15 @@ class FakeCluster(CloudFormationModel):
@property
def master_instance_type(self) -> str:
return self.emr_backend.instance_groups[self.master_instance_group_id].type # type: ignore
return self.emr_backend.instance_groups[
self.master_instance_group_id # type: ignore
].instance_type
@property
def slave_instance_type(self) -> str:
return self.emr_backend.instance_groups[self.core_instance_group_id].type # type: ignore
return self.emr_backend.instance_groups[
self.core_instance_group_id # type: ignore
].instance_type
@property
def instance_count(self) -> int:

View File

@ -758,7 +758,7 @@ DESCRIBE_JOB_FLOWS_TEMPLATE = """<DescribeJobFlowsResponse xmlns="http://elastic
<InstanceRequestCount>{{ instance_group.num_instances }}</InstanceRequestCount>
<InstanceRole>{{ instance_group.role }}</InstanceRole>
<InstanceRunningCount>{{ instance_group.num_instances }}</InstanceRunningCount>
<InstanceType>{{ instance_group.type }}</InstanceType>
<InstanceType>{{ instance_group.instance_type }}</InstanceType>
<LastStateChangeReason/>
<Market>{{ instance_group.market }}</Market>
<Name>{{ instance_group.name }}</Name>
@ -1084,7 +1084,7 @@ LIST_INSTANCE_GROUPS_TEMPLATE = """<ListInstanceGroupsResponse xmlns="http://ela
{% endif %}
<Id>{{ instance_group.id }}</Id>
<InstanceGroupType>{{ instance_group.role }}</InstanceGroupType>
<InstanceType>{{ instance_group.type }}</InstanceType>
<InstanceType>{{ instance_group.instance_type }}</InstanceType>
<Market>{{ instance_group.market }}</Market>
<Name>{{ instance_group.name }}</Name>
<RequestedInstanceCount>{{ instance_group.num_instances }}</RequestedInstanceCount>

View File

@ -632,3 +632,195 @@ def test_create_cluster_with_kerberos_attrs():
emr.describe_security_configuration(Name="mysecconfig")
err = exc.value.response["Error"]
assert err["Code"] == "InvalidRequestException"
template_with_simple_instance_group_config = {
"Resources": {
"Cluster1": {
"Type": "AWS::EMR::Cluster",
"Properties": {
"Instances": {
"CoreInstanceGroup": {
"InstanceCount": 3,
"InstanceType": "m3g",
}
},
"JobFlowRole": "EMR_EC2_DefaultRole",
"Name": "my cluster",
"ServiceRole": "EMR_DefaultRole",
},
},
"TestInstanceGroupConfig": {
"Type": "AWS::EMR::InstanceGroupConfig",
"Properties": {
"InstanceCount": 2,
"InstanceType": "m3.xlarge",
"InstanceRole": "TASK",
"Market": "ON_DEMAND",
"Name": "cfnTask2",
"JobFlowId": {"Ref": "Cluster1"},
},
},
},
}
@mock_aws
def test_create_simple_instance_group():
region = "us-east-1"
cf = boto3.client("cloudformation", region_name=region)
emr = boto3.client("emr", region_name=region)
cf.create_stack(
StackName="teststack",
TemplateBody=json.dumps(template_with_simple_instance_group_config),
)
# Verify resources
res = cf.describe_stack_resources(StackName="teststack")["StackResources"][0]
cluster_id = res["PhysicalResourceId"]
ig = emr.list_instance_groups(ClusterId=cluster_id)["InstanceGroups"][0]
assert ig["Name"] == "cfnTask2"
assert ig["Market"] == "ON_DEMAND"
assert ig["InstanceGroupType"] == "TASK"
assert ig["InstanceType"] == "m3.xlarge"
template_with_advanced_instance_group_config = {
"Resources": {
"Cluster1": {
"Type": "AWS::EMR::Cluster",
"Properties": {
"Instances": {
"CoreInstanceGroup": {
"InstanceCount": 3,
"InstanceType": "m3g",
}
},
"JobFlowRole": "EMR_EC2_DefaultRole",
"Name": "my cluster",
"ServiceRole": "EMR_DefaultRole",
},
},
"TestInstanceGroupConfig": {
"Type": "AWS::EMR::InstanceGroupConfig",
"Properties": {
"InstanceCount": 1,
"InstanceType": "m4.large",
"InstanceRole": "TASK",
"Market": "ON_DEMAND",
"Name": "cfnTask3",
"JobFlowId": {"Ref": "Cluster1"},
"EbsConfiguration": {
"EbsOptimized": True,
"EbsBlockDeviceConfigs": [
{
"VolumesPerInstance": 2,
"VolumeSpecification": {
"Iops": 10,
"SizeInGB": 50,
"Throughput": 100,
"VolumeType": "gp3",
},
}
],
},
"AutoScalingPolicy": {
"Constraints": {"MinCapacity": 1, "MaxCapacity": 4},
"Rules": [
{
"Name": "Scale-out",
"Description": "Scale-out policy",
"Action": {
"SimpleScalingPolicyConfiguration": {
"AdjustmentType": "CHANGE_IN_CAPACITY",
"ScalingAdjustment": 1,
"CoolDown": 300,
}
},
"Trigger": {
"CloudWatchAlarmDefinition": {
"Dimensions": [
{
"Key": "JobFlowId",
"Value": "${emr.clusterId}",
}
],
"EvaluationPeriods": 1,
"Namespace": "AWS/ElasticMapReduce",
"Period": 300,
"ComparisonOperator": "LESS_THAN",
"Statistic": "AVERAGE",
"Threshold": 15,
"Unit": "PERCENT",
"MetricName": "YARNMemoryAvailablePercentage",
}
},
},
{
"Name": "Scale-in",
"Description": "Scale-in policy",
"Action": {
"SimpleScalingPolicyConfiguration": {
"AdjustmentType": "CHANGE_IN_CAPACITY",
"ScalingAdjustment": -1,
"CoolDown": 300,
}
},
"Trigger": {
"CloudWatchAlarmDefinition": {
"Dimensions": [
{
"Key": "JobFlowId",
"Value": "${emr.clusterId}",
}
],
"EvaluationPeriods": 1,
"Namespace": "AWS/ElasticMapReduce",
"Period": 300,
"ComparisonOperator": "GREATER_THAN",
"Statistic": "AVERAGE",
"Threshold": 75,
"Unit": "PERCENT",
"MetricName": "YARNMemoryAvailablePercentage",
}
},
},
],
},
},
},
},
}
@mock_aws
def test_create_advanced_instance_group():
region = "us-east-1"
cf = boto3.client("cloudformation", region_name=region)
emr = boto3.client("emr", region_name=region)
cf.create_stack(
StackName="teststack",
TemplateBody=json.dumps(template_with_advanced_instance_group_config),
)
# Verify resources
res = cf.describe_stack_resources(StackName="teststack")["StackResources"][0]
cluster_id = res["PhysicalResourceId"]
ig = emr.list_instance_groups(ClusterId=cluster_id)["InstanceGroups"][0]
assert ig["Name"] == "cfnTask3"
assert ig["Market"] == "ON_DEMAND"
assert ig["InstanceGroupType"] == "TASK"
assert ig["InstanceType"] == "m4.large"
as_policy = ig["AutoScalingPolicy"]
assert as_policy["Status"] == {"State": "ATTACHED"}
assert as_policy["Constraints"] == {"MinCapacity": 1, "MaxCapacity": 4}
ebs = ig["EbsBlockDevices"]
assert ebs[0]["VolumeSpecification"] == {
"VolumeType": "gp3",
"Iops": 10,
"SizeInGB": 50,
}