Add visibletoallusers and normalizedinstancehours to EMR
This commit is contained in:
parent
1f83e6feea
commit
62b72377bd
@ -50,7 +50,7 @@ class FakeStep(object):
|
|||||||
|
|
||||||
|
|
||||||
class FakeJobFlow(object):
|
class FakeJobFlow(object):
|
||||||
def __init__(self, job_id, name, log_uri, job_flow_role, steps, instance_attrs):
|
def __init__(self, job_id, name, log_uri, job_flow_role, visible_to_all_users, steps, instance_attrs):
|
||||||
self.id = job_id
|
self.id = job_id
|
||||||
self.name = name
|
self.name = name
|
||||||
self.log_uri = log_uri
|
self.log_uri = log_uri
|
||||||
@ -63,6 +63,8 @@ class FakeJobFlow(object):
|
|||||||
self.initial_master_instance_type = instance_attrs.get('master_instance_type')
|
self.initial_master_instance_type = instance_attrs.get('master_instance_type')
|
||||||
self.initial_slave_instance_type = instance_attrs.get('slave_instance_type')
|
self.initial_slave_instance_type = instance_attrs.get('slave_instance_type')
|
||||||
|
|
||||||
|
self.set_visibility(visible_to_all_users)
|
||||||
|
self.normalized_instance_hours = 0
|
||||||
self.ec2_key_name = instance_attrs.get('ec2_key_name')
|
self.ec2_key_name = instance_attrs.get('ec2_key_name')
|
||||||
self.availability_zone = instance_attrs.get('placement.availability_zone')
|
self.availability_zone = instance_attrs.get('placement.availability_zone')
|
||||||
self.keep_job_flow_alive_when_no_steps = instance_attrs.get('keep_job_flow_alive_when_no_steps')
|
self.keep_job_flow_alive_when_no_steps = instance_attrs.get('keep_job_flow_alive_when_no_steps')
|
||||||
@ -73,6 +75,12 @@ class FakeJobFlow(object):
|
|||||||
def terminate(self):
|
def terminate(self):
|
||||||
self.state = 'TERMINATED'
|
self.state = 'TERMINATED'
|
||||||
|
|
||||||
|
def set_visibility(self, visibility):
|
||||||
|
if visibility == 'true':
|
||||||
|
self.visible_to_all_users = True
|
||||||
|
else:
|
||||||
|
self.visible_to_all_users = False
|
||||||
|
|
||||||
def add_steps(self, steps):
|
def add_steps(self, steps):
|
||||||
for index, step in enumerate(steps):
|
for index, step in enumerate(steps):
|
||||||
if self.steps:
|
if self.steps:
|
||||||
@ -122,9 +130,9 @@ class ElasticMapReduceBackend(BaseBackend):
|
|||||||
self.job_flows = {}
|
self.job_flows = {}
|
||||||
self.instance_groups = {}
|
self.instance_groups = {}
|
||||||
|
|
||||||
def run_job_flow(self, name, log_uri, job_flow_role, steps, instance_attrs):
|
def run_job_flow(self, name, log_uri, job_flow_role, visible_to_all_users, steps, instance_attrs):
|
||||||
job_id = random_job_id()
|
job_id = random_job_id()
|
||||||
job_flow = FakeJobFlow(job_id, name, log_uri, job_flow_role, steps, instance_attrs)
|
job_flow = FakeJobFlow(job_id, name, log_uri, job_flow_role, visible_to_all_users, steps, instance_attrs)
|
||||||
self.job_flows[job_id] = job_flow
|
self.job_flows[job_id] = job_flow
|
||||||
return job_flow
|
return job_flow
|
||||||
|
|
||||||
@ -167,4 +175,10 @@ class ElasticMapReduceBackend(BaseBackend):
|
|||||||
group.set_instance_count(instance_group['instance_count'])
|
group.set_instance_count(instance_group['instance_count'])
|
||||||
return result_groups
|
return result_groups
|
||||||
|
|
||||||
|
def set_visible_to_all_users(self, job_ids, visible_to_all_users):
|
||||||
|
for job_id in job_ids:
|
||||||
|
job = self.job_flows[job_id]
|
||||||
|
job.set_visibility(visible_to_all_users)
|
||||||
|
|
||||||
|
|
||||||
emr_backend = ElasticMapReduceBackend()
|
emr_backend = ElasticMapReduceBackend()
|
||||||
|
@ -51,8 +51,12 @@ class ElasticMapReduceResponse(BaseResponse):
|
|||||||
steps = self._get_list_prefix('Steps.member')
|
steps = self._get_list_prefix('Steps.member')
|
||||||
instance_attrs = self._get_dict_param('Instances.')
|
instance_attrs = self._get_dict_param('Instances.')
|
||||||
job_flow_role = self._get_param('JobFlowRole')
|
job_flow_role = self._get_param('JobFlowRole')
|
||||||
|
visible_to_all_users = self._get_param('VisibleToAllUsers')
|
||||||
|
|
||||||
job_flow = emr_backend.run_job_flow(flow_name, log_uri, job_flow_role, steps, instance_attrs)
|
job_flow = emr_backend.run_job_flow(
|
||||||
|
flow_name, log_uri, job_flow_role,
|
||||||
|
visible_to_all_users, steps, instance_attrs
|
||||||
|
)
|
||||||
template = Template(RUN_JOB_FLOW_TEMPLATE)
|
template = Template(RUN_JOB_FLOW_TEMPLATE)
|
||||||
return template.render(job_flow=job_flow)
|
return template.render(job_flow=job_flow)
|
||||||
|
|
||||||
@ -80,6 +84,13 @@ class ElasticMapReduceResponse(BaseResponse):
|
|||||||
template = Template(MODIFY_INSTANCE_GROUPS_TEMPLATE)
|
template = Template(MODIFY_INSTANCE_GROUPS_TEMPLATE)
|
||||||
return template.render(instance_groups=instance_groups)
|
return template.render(instance_groups=instance_groups)
|
||||||
|
|
||||||
|
def set_visible_to_all_users(self):
|
||||||
|
visible_to_all_users = self._get_param('VisibleToAllUsers')
|
||||||
|
job_ids = self._get_multi_param('JobFlowIds.member')
|
||||||
|
emr_backend.set_visible_to_all_users(job_ids, visible_to_all_users)
|
||||||
|
template = Template(SET_VISIBLE_TO_ALL_USERS_TEMPLATE)
|
||||||
|
return template.render()
|
||||||
|
|
||||||
|
|
||||||
RUN_JOB_FLOW_TEMPLATE = """<RunJobFlowResponse xmlns="http://elasticmapreduce.amazonaws.com/doc/2009-03-31">
|
RUN_JOB_FLOW_TEMPLATE = """<RunJobFlowResponse xmlns="http://elasticmapreduce.amazonaws.com/doc/2009-03-31">
|
||||||
<RunJobFlowResult>
|
<RunJobFlowResult>
|
||||||
@ -137,6 +148,8 @@ DESCRIBE_JOB_FLOWS_TEMPLATE = """<DescribeJobFlowsResponse xmlns="http://elastic
|
|||||||
<SlaveInstanceType>{{ job_flow.slave_instance_type }}</SlaveInstanceType>
|
<SlaveInstanceType>{{ job_flow.slave_instance_type }}</SlaveInstanceType>
|
||||||
<MasterInstanceType>{{ job_flow.master_instance_type }}</MasterInstanceType>
|
<MasterInstanceType>{{ job_flow.master_instance_type }}</MasterInstanceType>
|
||||||
<Ec2KeyName>{{ job_flow.ec2_key_name }}</Ec2KeyName>
|
<Ec2KeyName>{{ job_flow.ec2_key_name }}</Ec2KeyName>
|
||||||
|
<NormalizedInstanceHours>{{ job_flow.normalized_instance_hours }}</NormalizedInstanceHours>
|
||||||
|
<VisibleToAllUsers>{{ job_flow.visible_to_all_users }}</VisibleToAllUsers>
|
||||||
<InstanceCount>{{ job_flow.instance_count }}</InstanceCount>
|
<InstanceCount>{{ job_flow.instance_count }}</InstanceCount>
|
||||||
<KeepJobFlowAliveWhenNoSteps>{{ job_flow.keep_job_flow_alive_when_no_steps }}</KeepJobFlowAliveWhenNoSteps>
|
<KeepJobFlowAliveWhenNoSteps>{{ job_flow.keep_job_flow_alive_when_no_steps }}</KeepJobFlowAliveWhenNoSteps>
|
||||||
<TerminationProtected>{{ job_flow.termination_protected }}</TerminationProtected>
|
<TerminationProtected>{{ job_flow.termination_protected }}</TerminationProtected>
|
||||||
@ -192,3 +205,11 @@ MODIFY_INSTANCE_GROUPS_TEMPLATE = """<ModifyInstanceGroupsResponse xmlns="http:/
|
|||||||
</RequestId>
|
</RequestId>
|
||||||
</ResponseMetadata>
|
</ResponseMetadata>
|
||||||
</ModifyInstanceGroupsResponse>"""
|
</ModifyInstanceGroupsResponse>"""
|
||||||
|
|
||||||
|
SET_VISIBLE_TO_ALL_USERS_TEMPLATE = """<SetVisibleToAllUsersResponse xmlns="http://elasticmapreduce.amazonaws.com/doc/2009-03-31">
|
||||||
|
<ResponseMetadata>
|
||||||
|
<RequestId>
|
||||||
|
2690d7eb-ed86-11dd-9877-6fad448a8419
|
||||||
|
</RequestId>
|
||||||
|
</ResponseMetadata>
|
||||||
|
</SetVisibleToAllUsersResponse>"""
|
||||||
|
@ -42,6 +42,8 @@ def test_create_job_flow():
|
|||||||
job_flow.masterinstancetype.should.equal('m1.medium')
|
job_flow.masterinstancetype.should.equal('m1.medium')
|
||||||
job_flow.slaveinstancetype.should.equal('m1.small')
|
job_flow.slaveinstancetype.should.equal('m1.small')
|
||||||
job_flow.loguri.should.equal('s3://some_bucket/jobflow_logs')
|
job_flow.loguri.should.equal('s3://some_bucket/jobflow_logs')
|
||||||
|
job_flow.visibletoallusers.should.equal('False')
|
||||||
|
int(job_flow.normalizedinstancehours).should.equal(0)
|
||||||
job_step = job_flow.steps[0]
|
job_step = job_flow.steps[0]
|
||||||
job_step.name.should.equal('My wordcount example')
|
job_step.name.should.equal('My wordcount example')
|
||||||
job_step.state.should.equal('STARTING')
|
job_step.state.should.equal('STARTING')
|
||||||
@ -89,6 +91,21 @@ def test_create_job_flow_with_new_params():
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@mock_emr
|
||||||
|
def test_create_job_flow_visible_to_all_users():
|
||||||
|
conn = boto.connect_emr()
|
||||||
|
|
||||||
|
job_id = conn.run_jobflow(
|
||||||
|
name='My jobflow',
|
||||||
|
log_uri='s3://some_bucket/jobflow_logs',
|
||||||
|
job_flow_role='some-role-arn',
|
||||||
|
steps=[],
|
||||||
|
visible_to_all_users=True,
|
||||||
|
)
|
||||||
|
job_flow = conn.describe_jobflow(job_id)
|
||||||
|
job_flow.visibletoallusers.should.equal('True')
|
||||||
|
|
||||||
|
|
||||||
@mock_emr
|
@mock_emr
|
||||||
def test_terminate_job_flow():
|
def test_terminate_job_flow():
|
||||||
conn = boto.connect_emr()
|
conn = boto.connect_emr()
|
||||||
@ -248,3 +265,28 @@ def test_modify_instance_groups():
|
|||||||
if group.instancegroupid == instance_group_ids[1]
|
if group.instancegroupid == instance_group_ids[1]
|
||||||
][0]
|
][0]
|
||||||
int(instance_group2.instancerunningcount).should.equal(3)
|
int(instance_group2.instancerunningcount).should.equal(3)
|
||||||
|
|
||||||
|
|
||||||
|
@mock_emr
|
||||||
|
def test_set_visible_to_all_users():
|
||||||
|
conn = boto.connect_emr()
|
||||||
|
|
||||||
|
job_id = conn.run_jobflow(
|
||||||
|
name='My jobflow',
|
||||||
|
log_uri='s3://some_bucket/jobflow_logs',
|
||||||
|
job_flow_role='some-role-arn',
|
||||||
|
steps=[],
|
||||||
|
visible_to_all_users=False,
|
||||||
|
)
|
||||||
|
job_flow = conn.describe_jobflow(job_id)
|
||||||
|
job_flow.visibletoallusers.should.equal('False')
|
||||||
|
|
||||||
|
conn.set_visible_to_all_users(job_id, True)
|
||||||
|
|
||||||
|
job_flow = conn.describe_jobflow(job_id)
|
||||||
|
job_flow.visibletoallusers.should.equal('True')
|
||||||
|
|
||||||
|
conn.set_visible_to_all_users(job_id, False)
|
||||||
|
|
||||||
|
job_flow = conn.describe_jobflow(job_id)
|
||||||
|
job_flow.visibletoallusers.should.equal('False')
|
||||||
|
Loading…
Reference in New Issue
Block a user