Support EMR clusters with multiple master nodes (#4019)

Reference: https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-ha-launch.html
This commit is contained in:
Brian Pandola 2021-06-17 22:04:21 -07:00 committed by GitHub
parent 2f23807a35
commit a95ca81e76
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 66 additions and 0 deletions

View File

@ -332,6 +332,15 @@ class FakeCluster(BaseModel):
if self.master_instance_group_id:
raise Exception("Cannot add another master instance group")
self.master_instance_group_id = instance_group.id
num_master_nodes = instance_group.num_instances
if num_master_nodes > 1:
# Cluster is HA
if num_master_nodes != 3:
raise ValidationException(
"Master instance group must have exactly 3 instances for HA clusters."
)
self.keep_job_flow_alive_when_no_steps = True
self.termination_protected = True
if instance_group.role == "CORE":
if self.core_instance_group_id:
raise Exception("Cannot add another core instance group")

View File

@ -1126,3 +1126,60 @@ def test_security_configurations():
ex.value.response["Error"]["Message"].should.match(
r"Security configuration with name .* does not exist."
)
@mock_emr
def test_run_job_flow_with_invalid_number_of_master_nodes_raises_error():
client = boto3.client("emr", region_name="us-east-1")
params = dict(
Name="test-cluster",
Instances={
"InstanceGroups": [
{
"InstanceCount": 2,
"InstanceRole": "MASTER",
"InstanceType": "c1.medium",
"Market": "ON_DEMAND",
"Name": "master",
}
]
},
)
with pytest.raises(ClientError) as ex:
client.run_job_flow(**params)
error = ex.value.response["Error"]
error["Code"].should.equal("ValidationException")
error["Message"].should.equal(
"Master instance group must have exactly 3 instances for HA clusters."
)
@mock_emr
def test_run_job_flow_with_multiple_master_nodes():
client = boto3.client("emr", region_name="us-east-1")
params = dict(
Name="test-cluster",
Instances={
"InstanceGroups": [
{
"InstanceCount": 3,
"InstanceRole": "MASTER",
"InstanceType": "c1.medium",
"Market": "ON_DEMAND",
"Name": "master",
}
],
"KeepJobFlowAliveWhenNoSteps": False,
"TerminationProtected": False,
},
)
cluster_id = client.run_job_flow(**params)["JobFlowId"]
cluster = client.describe_cluster(ClusterId=cluster_id)["Cluster"]
cluster["AutoTerminate"].should.equal(False)
cluster["TerminationProtected"].should.equal(True)
groups = client.list_instance_groups(ClusterId=cluster_id)["InstanceGroups"]
master_instance_group = next(
group for group in groups if group["InstanceGroupType"] == "MASTER"
)
master_instance_group["RequestedInstanceCount"].should.equal(3)
master_instance_group["RunningInstanceCount"].should.equal(3)