Add job_flow_role param or EMR
This commit is contained in:
parent
ded410460f
commit
e251fd8930
@ -2,6 +2,8 @@ from moto.core import BaseBackend
|
||||
|
||||
from .utils import random_job_id, random_instance_group_id
|
||||
|
||||
DEFAULT_JOB_FLOW_ROLE = 'EMRJobflowDefault'
|
||||
|
||||
|
||||
class FakeInstanceGroup(object):
|
||||
def __init__(self, id, instance_count, instance_role, instance_type, market, name, bid_price=None):
|
||||
@ -48,10 +50,11 @@ class FakeStep(object):
|
||||
|
||||
|
||||
class FakeJobFlow(object):
|
||||
def __init__(self, job_id, name, log_uri, steps, instance_attrs):
|
||||
def __init__(self, job_id, name, log_uri, job_flow_role, steps, instance_attrs):
|
||||
self.id = job_id
|
||||
self.name = name
|
||||
self.log_uri = log_uri
|
||||
self.role = job_flow_role or DEFAULT_JOB_FLOW_ROLE
|
||||
self.state = "STARTING"
|
||||
self.steps = []
|
||||
self.add_steps(steps)
|
||||
@ -119,9 +122,9 @@ class ElasticMapReduceBackend(BaseBackend):
|
||||
self.job_flows = {}
|
||||
self.instance_groups = {}
|
||||
|
||||
def run_job_flow(self, name, log_uri, steps, instance_attrs):
|
||||
def run_job_flow(self, name, log_uri, job_flow_role, steps, instance_attrs):
|
||||
job_id = random_job_id()
|
||||
job_flow = FakeJobFlow(job_id, name, log_uri, steps, instance_attrs)
|
||||
job_flow = FakeJobFlow(job_id, name, log_uri, job_flow_role, steps, instance_attrs)
|
||||
self.job_flows[job_id] = job_flow
|
||||
return job_flow
|
||||
|
||||
|
@ -50,8 +50,9 @@ class ElasticMapReduceResponse(BaseResponse):
|
||||
log_uri = self._get_param('LogUri')
|
||||
steps = self._get_list_prefix('Steps.member')
|
||||
instance_attrs = self._get_dict_param('Instances.')
|
||||
job_flow_role = self._get_param('JobFlowRole')
|
||||
|
||||
job_flow = emr_backend.run_job_flow(flow_name, log_uri, steps, instance_attrs)
|
||||
job_flow = emr_backend.run_job_flow(flow_name, log_uri, job_flow_role, steps, instance_attrs)
|
||||
template = Template(RUN_JOB_FLOW_TEMPLATE)
|
||||
return template.render(job_flow=job_flow)
|
||||
|
||||
@ -102,6 +103,7 @@ DESCRIBE_JOB_FLOWS_TEMPLATE = """<DescribeJobFlowsResponse xmlns="http://elastic
|
||||
<State>{{ job_flow.state }}</State>
|
||||
</ExecutionStatusDetail>
|
||||
<Name>{{ job_flow.name }}</Name>
|
||||
<JobFlowRole>{{ job_flow.role }}</JobFlowRole>
|
||||
<LogUri>{{ job_flow.log_uri }}</LogUri>
|
||||
<Steps>
|
||||
{% for step in job_flow.steps %}
|
||||
|
@ -31,7 +31,8 @@ def test_create_job_flow():
|
||||
log_uri='s3://some_bucket/jobflow_logs',
|
||||
master_instance_type='m1.medium',
|
||||
slave_instance_type='m1.small',
|
||||
steps=[step1, step2]
|
||||
job_flow_role='some-role-arn',
|
||||
steps=[step1, step2],
|
||||
)
|
||||
|
||||
job_flow = conn.describe_jobflow(job_id)
|
||||
@ -169,7 +170,7 @@ def test_create_instance_groups():
|
||||
job_id = conn.run_jobflow(
|
||||
name='My jobflow',
|
||||
log_uri='s3://some_bucket/jobflow_logs',
|
||||
steps=[step1]
|
||||
steps=[step1],
|
||||
)
|
||||
|
||||
instance_group = InstanceGroup(6, 'TASK', 'c1.medium', 'SPOT', 'spot-0.07', '0.07')
|
||||
|
Loading…
Reference in New Issue
Block a user