Merge pull request #373 from crosswise/master
feature emr: set_termination_protection and instance_groups param in run_jobflow constructor
This commit is contained in:
commit
f487620c27
@ -124,6 +124,10 @@ class FakeJobFlow(object):
|
|||||||
else:
|
else:
|
||||||
self.visible_to_all_users = False
|
self.visible_to_all_users = False
|
||||||
|
|
||||||
|
|
||||||
|
def set_termination_protection(self, value):
|
||||||
|
self.termination_protected = value
|
||||||
|
|
||||||
def add_steps(self, steps):
|
def add_steps(self, steps):
|
||||||
for index, step in enumerate(steps):
|
for index, step in enumerate(steps):
|
||||||
if self.steps:
|
if self.steps:
|
||||||
@ -236,6 +240,11 @@ class ElasticMapReduceBackend(BaseBackend):
|
|||||||
job = self.job_flows[job_id]
|
job = self.job_flows[job_id]
|
||||||
job.set_visibility(visible_to_all_users)
|
job.set_visibility(visible_to_all_users)
|
||||||
|
|
||||||
|
def set_termination_protection(self, job_ids, value):
|
||||||
|
for job_id in job_ids:
|
||||||
|
job = self.job_flows[job_id]
|
||||||
|
job.set_termination_protection(value)
|
||||||
|
|
||||||
def add_tags(self, cluster_id, tags):
|
def add_tags(self, cluster_id, tags):
|
||||||
cluster = self.get_cluster(cluster_id)
|
cluster = self.get_cluster(cluster_id)
|
||||||
cluster.add_tags(tags)
|
cluster.add_tags(tags)
|
||||||
|
@ -27,6 +27,10 @@ class ElasticMapReduceResponse(BaseResponse):
|
|||||||
flow_name, log_uri, job_flow_role,
|
flow_name, log_uri, job_flow_role,
|
||||||
visible_to_all_users, steps, instance_attrs
|
visible_to_all_users, steps, instance_attrs
|
||||||
)
|
)
|
||||||
|
instance_groups = self._get_list_prefix('Instances.InstanceGroups.member')
|
||||||
|
if instance_groups:
|
||||||
|
emr_backend.add_instance_groups(job_flow.id, instance_groups)
|
||||||
|
|
||||||
template = self.response_template(RUN_JOB_FLOW_TEMPLATE)
|
template = self.response_template(RUN_JOB_FLOW_TEMPLATE)
|
||||||
return template.render(job_flow=job_flow)
|
return template.render(job_flow=job_flow)
|
||||||
|
|
||||||
@ -62,6 +66,13 @@ class ElasticMapReduceResponse(BaseResponse):
|
|||||||
template = self.response_template(SET_VISIBLE_TO_ALL_USERS_TEMPLATE)
|
template = self.response_template(SET_VISIBLE_TO_ALL_USERS_TEMPLATE)
|
||||||
return template.render()
|
return template.render()
|
||||||
|
|
||||||
|
def set_termination_protection(self):
|
||||||
|
termination_protection = self._get_param('TerminationProtected')
|
||||||
|
job_ids = self._get_multi_param('JobFlowIds.member')
|
||||||
|
emr_backend.set_termination_protection(job_ids, termination_protection)
|
||||||
|
template = self.response_template(SET_TERMINATION_PROTECTION_TEMPLATE)
|
||||||
|
return template.render()
|
||||||
|
|
||||||
def list_clusters(self):
|
def list_clusters(self):
|
||||||
clusters = emr_backend.list_clusters()
|
clusters = emr_backend.list_clusters()
|
||||||
template = self.response_template(LIST_CLUSTERS_TEMPLATE)
|
template = self.response_template(LIST_CLUSTERS_TEMPLATE)
|
||||||
@ -149,6 +160,7 @@ DESCRIBE_JOB_FLOWS_TEMPLATE = """<DescribeJobFlowsResponse xmlns="http://elastic
|
|||||||
<InstanceCount>{{ job_flow.instance_count }}</InstanceCount>
|
<InstanceCount>{{ job_flow.instance_count }}</InstanceCount>
|
||||||
<KeepJobFlowAliveWhenNoSteps>{{ job_flow.keep_job_flow_alive_when_no_steps }}</KeepJobFlowAliveWhenNoSteps>
|
<KeepJobFlowAliveWhenNoSteps>{{ job_flow.keep_job_flow_alive_when_no_steps }}</KeepJobFlowAliveWhenNoSteps>
|
||||||
<TerminationProtected>{{ job_flow.termination_protected }}</TerminationProtected>
|
<TerminationProtected>{{ job_flow.termination_protected }}</TerminationProtected>
|
||||||
|
<MasterPublicDnsName>ec2-184-0-0-1.us-west-1.compute.amazonaws.com</MasterPublicDnsName>
|
||||||
<InstanceGroups>
|
<InstanceGroups>
|
||||||
{% for instance_group in job_flow.instance_groups %}
|
{% for instance_group in job_flow.instance_groups %}
|
||||||
<member>
|
<member>
|
||||||
@ -290,6 +302,14 @@ SET_VISIBLE_TO_ALL_USERS_TEMPLATE = """<SetVisibleToAllUsersResponse xmlns="http
|
|||||||
</SetVisibleToAllUsersResponse>"""
|
</SetVisibleToAllUsersResponse>"""
|
||||||
|
|
||||||
|
|
||||||
|
SET_TERMINATION_PROTECTION_TEMPLATE = """<SetTerminationProtection xmlns="http://elasticmapreduce.amazonaws.com/doc/2009-03-31">
|
||||||
|
<ResponseMetadata>
|
||||||
|
<RequestId>
|
||||||
|
2690d7eb-ed86-11dd-9877-6fad448a8419
|
||||||
|
</RequestId>
|
||||||
|
</ResponseMetadata>
|
||||||
|
</SetTerminationProtection>"""
|
||||||
|
|
||||||
ADD_TAGS_TEMPLATE = """<AddTagsResponse xmlns="http://elasticmapreduce.amazonaws.com/doc/2009-03-31">
|
ADD_TAGS_TEMPLATE = """<AddTagsResponse xmlns="http://elasticmapreduce.amazonaws.com/doc/2009-03-31">
|
||||||
<ResponseMetadata>
|
<ResponseMetadata>
|
||||||
<RequestId>
|
<RequestId>
|
||||||
|
@ -1,6 +1,8 @@
|
|||||||
from __future__ import unicode_literals
|
from __future__ import unicode_literals
|
||||||
|
|
||||||
import boto
|
import boto
|
||||||
from boto.emr.instance_group import InstanceGroup
|
from boto.emr.instance_group import InstanceGroup
|
||||||
|
|
||||||
from boto.emr.step import StreamingStep
|
from boto.emr.step import StreamingStep
|
||||||
import sure # noqa
|
import sure # noqa
|
||||||
|
|
||||||
@ -107,6 +109,27 @@ def test_create_job_flow_visible_to_all_users():
|
|||||||
job_flow.visibletoallusers.should.equal('True')
|
job_flow.visibletoallusers.should.equal('True')
|
||||||
|
|
||||||
|
|
||||||
|
@requires_boto_gte("2.8")
|
||||||
|
@mock_emr
|
||||||
|
def test_create_job_flow_with_instance_groups():
|
||||||
|
conn = boto.connect_emr()
|
||||||
|
|
||||||
|
instance_groups = [InstanceGroup(6, 'TASK', 'c1.medium', 'SPOT', 'spot-0.07', '0.07'),
|
||||||
|
InstanceGroup(6, 'TASK', 'c1.medium', 'SPOT', 'spot-0.07', '0.07')]
|
||||||
|
job_id = conn.run_jobflow(
|
||||||
|
name='My jobflow',
|
||||||
|
log_uri='s3://some_bucket/jobflow_logs',
|
||||||
|
steps=[],
|
||||||
|
instance_groups=instance_groups
|
||||||
|
)
|
||||||
|
|
||||||
|
job_flow = conn.describe_jobflow(job_id)
|
||||||
|
int(job_flow.instancecount).should.equal(12)
|
||||||
|
instance_group = job_flow.instancegroups[0]
|
||||||
|
int(instance_group.instancerunningcount).should.equal(6)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@mock_emr
|
@mock_emr
|
||||||
def test_terminate_job_flow():
|
def test_terminate_job_flow():
|
||||||
conn = boto.connect_emr()
|
conn = boto.connect_emr()
|
||||||
@ -318,6 +341,30 @@ def test_set_visible_to_all_users():
|
|||||||
job_flow.visibletoallusers.should.equal('False')
|
job_flow.visibletoallusers.should.equal('False')
|
||||||
|
|
||||||
|
|
||||||
|
@requires_boto_gte("2.8")
|
||||||
|
@mock_emr
|
||||||
|
def test_set_termination_protection():
|
||||||
|
conn = boto.connect_emr()
|
||||||
|
|
||||||
|
job_id = conn.run_jobflow(
|
||||||
|
name='My jobflow',
|
||||||
|
log_uri='s3://some_bucket/jobflow_logs',
|
||||||
|
steps=[]
|
||||||
|
)
|
||||||
|
job_flow = conn.describe_jobflow(job_id)
|
||||||
|
job_flow.terminationprotected.should.equal(u'None')
|
||||||
|
|
||||||
|
conn.set_termination_protection(job_id, True)
|
||||||
|
|
||||||
|
job_flow = conn.describe_jobflow(job_id)
|
||||||
|
job_flow.terminationprotected.should.equal('true')
|
||||||
|
|
||||||
|
conn.set_termination_protection(job_id, False)
|
||||||
|
|
||||||
|
job_flow = conn.describe_jobflow(job_id)
|
||||||
|
job_flow.terminationprotected.should.equal('false')
|
||||||
|
|
||||||
|
|
||||||
@mock_emr
|
@mock_emr
|
||||||
def test_list_clusters():
|
def test_list_clusters():
|
||||||
conn = boto.connect_emr()
|
conn = boto.connect_emr()
|
||||||
|
Loading…
Reference in New Issue
Block a user