EMR: Support for StepConcurrencyLevel. (#3351)

Co-authored-by: Joseph Weitekamp <jweite@amazon.com>
This commit is contained in:
jweite 2020-10-02 09:07:13 -04:00 committed by GitHub
parent a668349a70
commit 9bc6bded6e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 63 additions and 3 deletions

View File

@ -146,6 +146,7 @@ class FakeCluster(BaseModel):
requested_ami_version=None,
running_ami_version=None,
custom_ami_id=None,
step_concurrency_level=1,
):
self.id = cluster_id or random_cluster_id()
emr_backend.clusters[self.id] = self
@ -236,6 +237,7 @@ class FakeCluster(BaseModel):
self.role = job_flow_role or "EMRJobflowDefault"
self.service_role = service_role
self.step_concurrency_level = step_concurrency_level
self.creation_datetime = datetime.now(pytz.utc)
self.start_datetime = None
@ -469,6 +471,11 @@ class ElasticMapReduceBackend(BaseBackend):
)
return steps[start_idx : start_idx + max_items], marker
def modify_cluster(self, cluster_id, step_concurrency_level):
cluster = self.clusters[cluster_id]
cluster.step_concurrency_level = step_concurrency_level
return cluster
def modify_instance_groups(self, instance_groups):
result_groups = []
for instance_group in instance_groups:

View File

@ -127,9 +127,6 @@ class ElasticMapReduceResponse(BaseResponse):
template = self.response_template(DESCRIBE_JOB_FLOWS_TEMPLATE)
return template.render(clusters=clusters)
def describe_security_configuration(self):
raise NotImplementedError
@generate_boto3_response("DescribeStep")
def describe_step(self):
cluster_id = self._get_param("ClusterId")
@ -185,6 +182,17 @@ class ElasticMapReduceResponse(BaseResponse):
template = self.response_template(LIST_STEPS_TEMPLATE)
return template.render(steps=steps, marker=marker)
@generate_boto3_response("ModifyCluster")
def modify_cluster(self):
cluster_id = self._get_param("ClusterId")
step_concurrency_level = self._get_param("StepConcurrencyLevel")
cluster = self.backend.modify_cluster(cluster_id, step_concurrency_level)
template = self.response_template(MODIFY_CLUSTER_TEMPLATE)
return template.render(cluster=cluster)
def describe_security_configuration(self):
raise NotImplementedError
@generate_boto3_response("ModifyInstanceGroups")
def modify_instance_groups(self):
instance_groups = self._get_list_prefix("InstanceGroups.member")
@ -315,6 +323,10 @@ class ElasticMapReduceResponse(BaseResponse):
template="error_json",
)
step_concurrency_level = self._get_param("StepConcurrencyLevel")
if step_concurrency_level:
kwargs["step_concurrency_level"] = step_concurrency_level
cluster = self.backend.run_job_flow(**kwargs)
applications = self._get_list_prefix("Applications.member")
@ -591,6 +603,7 @@ DESCRIBE_CLUSTER_TEMPLATE = """<DescribeClusterResponse xmlns="http://elasticmap
</Tags>
<TerminationProtected>{{ cluster.termination_protected|lower }}</TerminationProtected>
<VisibleToAllUsers>{{ cluster.visible_to_all_users|lower }}</VisibleToAllUsers>
<StepConcurrencyLevel>{{ cluster.step_concurrency_level }}</StepConcurrencyLevel>
</Cluster>
</DescribeClusterResult>
<ResponseMetadata>
@ -1075,6 +1088,16 @@ LIST_STEPS_TEMPLATE = """<ListStepsResponse xmlns="http://elasticmapreduce.amazo
</ResponseMetadata>
</ListStepsResponse>"""
MODIFY_CLUSTER_TEMPLATE = """<ModifyClusterResponse xmlns="http://elasticmapreduce.amazonaws.com/doc/2009-03-31">
<ModifyClusterResult>
<StepConcurrencyLevel>{{ cluster.step_concurrency_level }}</StepConcurrencyLevel>
</ModifyClusterResult>
<ResponseMetadata>
<RequestId>0751c837-e78d-4aef-95c9-9c4d29a092ff</RequestId>
</ResponseMetadata>
</ModifyClusterResponse>
"""
MODIFY_INSTANCE_GROUPS_TEMPLATE = """<ModifyInstanceGroupsResponse xmlns="http://elasticmapreduce.amazonaws.com/doc/2009-03-31">
<ResponseMetadata>
<RequestId>2690d7eb-ed86-11dd-9877-6fad448a8419</RequestId>

View File

@ -631,6 +631,36 @@ def test_run_job_flow_with_custom_ami():
resp["Cluster"]["CustomAmiId"].should.equal("MyEmrCustomAmi")
@mock_emr
def test_run_job_flow_with_step_concurrency():
client = boto3.client("emr", region_name="us-east-1")
args = deepcopy(run_job_flow_args)
args["StepConcurrencyLevel"] = 2
cluster_id = client.run_job_flow(**args)["JobFlowId"]
resp = client.describe_cluster(ClusterId=cluster_id)["Cluster"]
resp["Name"].should.equal(args["Name"])
resp["Status"]["State"].should.equal("WAITING")
resp["StepConcurrencyLevel"].should.equal(2)
@mock_emr
def test_modify_cluster():
client = boto3.client("emr", region_name="us-east-1")
args = deepcopy(run_job_flow_args)
args["StepConcurrencyLevel"] = 2
cluster_id = client.run_job_flow(**args)["JobFlowId"]
resp = client.describe_cluster(ClusterId=cluster_id)["Cluster"]
resp["Name"].should.equal(args["Name"])
resp["Status"]["State"].should.equal("WAITING")
resp["StepConcurrencyLevel"].should.equal(2)
resp = client.modify_cluster(ClusterId=cluster_id, StepConcurrencyLevel=4)
resp["StepConcurrencyLevel"].should.equal(4)
resp = client.describe_cluster(ClusterId=cluster_id)["Cluster"]
resp["StepConcurrencyLevel"].should.equal(4)
@mock_emr
def test_set_termination_protection():
client = boto3.client("emr", region_name="us-east-1")