diff --git a/moto/emr/models.py b/moto/emr/models.py index 63aadf105..5a34c4d10 100644 --- a/moto/emr/models.py +++ b/moto/emr/models.py @@ -146,6 +146,7 @@ class FakeCluster(BaseModel): requested_ami_version=None, running_ami_version=None, custom_ami_id=None, + step_concurrency_level=1, ): self.id = cluster_id or random_cluster_id() emr_backend.clusters[self.id] = self @@ -236,6 +237,7 @@ class FakeCluster(BaseModel): self.role = job_flow_role or "EMRJobflowDefault" self.service_role = service_role + self.step_concurrency_level = step_concurrency_level self.creation_datetime = datetime.now(pytz.utc) self.start_datetime = None @@ -469,6 +471,11 @@ class ElasticMapReduceBackend(BaseBackend): ) return steps[start_idx : start_idx + max_items], marker + def modify_cluster(self, cluster_id, step_concurrency_level): + cluster = self.clusters[cluster_id] + cluster.step_concurrency_level = step_concurrency_level + return cluster + def modify_instance_groups(self, instance_groups): result_groups = [] for instance_group in instance_groups: diff --git a/moto/emr/responses.py b/moto/emr/responses.py index 38a33519c..9ced4569b 100644 --- a/moto/emr/responses.py +++ b/moto/emr/responses.py @@ -127,9 +127,6 @@ class ElasticMapReduceResponse(BaseResponse): template = self.response_template(DESCRIBE_JOB_FLOWS_TEMPLATE) return template.render(clusters=clusters) - def describe_security_configuration(self): - raise NotImplementedError - @generate_boto3_response("DescribeStep") def describe_step(self): cluster_id = self._get_param("ClusterId") @@ -185,6 +182,17 @@ class ElasticMapReduceResponse(BaseResponse): template = self.response_template(LIST_STEPS_TEMPLATE) return template.render(steps=steps, marker=marker) + @generate_boto3_response("ModifyCluster") + def modify_cluster(self): + cluster_id = self._get_param("ClusterId") + step_concurrency_level = self._get_param("StepConcurrencyLevel") + cluster = self.backend.modify_cluster(cluster_id, step_concurrency_level) + template = self.response_template(MODIFY_CLUSTER_TEMPLATE) + return template.render(cluster=cluster) + + def describe_security_configuration(self): + raise NotImplementedError + @generate_boto3_response("ModifyInstanceGroups") def modify_instance_groups(self): instance_groups = self._get_list_prefix("InstanceGroups.member") @@ -315,6 +323,10 @@ class ElasticMapReduceResponse(BaseResponse): template="error_json", ) + step_concurrency_level = self._get_param("StepConcurrencyLevel") + if step_concurrency_level: + kwargs["step_concurrency_level"] = step_concurrency_level + cluster = self.backend.run_job_flow(**kwargs) applications = self._get_list_prefix("Applications.member") @@ -591,6 +603,7 @@ DESCRIBE_CLUSTER_TEMPLATE = """ + + {{ cluster.step_concurrency_level }} + + + 0751c837-e78d-4aef-95c9-9c4d29a092ff + + +""" + MODIFY_INSTANCE_GROUPS_TEMPLATE = """ 2690d7eb-ed86-11dd-9877-6fad448a8419 diff --git a/tests/test_emr/test_emr_boto3.py b/tests/test_emr/test_emr_boto3.py index 3f577c69a..af6939f80 100644 --- a/tests/test_emr/test_emr_boto3.py +++ b/tests/test_emr/test_emr_boto3.py @@ -631,6 +631,36 @@ def test_run_job_flow_with_custom_ami(): resp["Cluster"]["CustomAmiId"].should.equal("MyEmrCustomAmi") +@mock_emr +def test_run_job_flow_with_step_concurrency(): + client = boto3.client("emr", region_name="us-east-1") + args = deepcopy(run_job_flow_args) + args["StepConcurrencyLevel"] = 2 + cluster_id = client.run_job_flow(**args)["JobFlowId"] + resp = client.describe_cluster(ClusterId=cluster_id)["Cluster"] + resp["Name"].should.equal(args["Name"]) + resp["Status"]["State"].should.equal("WAITING") + resp["StepConcurrencyLevel"].should.equal(2) + + +@mock_emr +def test_modify_cluster(): + client = boto3.client("emr", region_name="us-east-1") + args = deepcopy(run_job_flow_args) + args["StepConcurrencyLevel"] = 2 + cluster_id = client.run_job_flow(**args)["JobFlowId"] + resp = client.describe_cluster(ClusterId=cluster_id)["Cluster"] + resp["Name"].should.equal(args["Name"]) + resp["Status"]["State"].should.equal("WAITING") + resp["StepConcurrencyLevel"].should.equal(2) + + resp = client.modify_cluster(ClusterId=cluster_id, StepConcurrencyLevel=4) + resp["StepConcurrencyLevel"].should.equal(4) + + resp = client.describe_cluster(ClusterId=cluster_id)["Cluster"] + resp["StepConcurrencyLevel"].should.equal(4) + + @mock_emr def test_set_termination_protection(): client = boto3.client("emr", region_name="us-east-1")