Merge pull request #375 from achiku/emr-multi-region
Make EMR multi region
This commit is contained in:
commit
4db141202e
@ -1,3 +1,12 @@
|
|||||||
from __future__ import unicode_literals
|
from __future__ import unicode_literals
|
||||||
from .models import emr_backend
|
from .models import emr_backends
|
||||||
mock_emr = emr_backend.decorator
|
from ..core.models import MockAWS
|
||||||
|
|
||||||
|
emr_backend = emr_backends['us-east-1']
|
||||||
|
|
||||||
|
|
||||||
|
def mock_emr(func=None):
|
||||||
|
if func:
|
||||||
|
return MockAWS(emr_backends)(func)
|
||||||
|
else:
|
||||||
|
return MockAWS(emr_backends)
|
||||||
|
@ -1,7 +1,9 @@
|
|||||||
from __future__ import unicode_literals
|
from __future__ import unicode_literals
|
||||||
|
|
||||||
|
import boto.emr
|
||||||
from moto.core import BaseBackend
|
from moto.core import BaseBackend
|
||||||
|
|
||||||
from .utils import random_job_id, random_instance_group_id
|
from .utils import random_instance_group_id, random_job_id
|
||||||
|
|
||||||
DEFAULT_JOB_FLOW_ROLE = 'EMRJobflowDefault'
|
DEFAULT_JOB_FLOW_ROLE = 'EMRJobflowDefault'
|
||||||
|
|
||||||
@ -80,7 +82,7 @@ class FakeStep(object):
|
|||||||
|
|
||||||
|
|
||||||
class FakeJobFlow(object):
|
class FakeJobFlow(object):
|
||||||
def __init__(self, job_id, name, log_uri, job_flow_role, visible_to_all_users, steps, instance_attrs):
|
def __init__(self, job_id, name, log_uri, job_flow_role, visible_to_all_users, steps, instance_attrs, emr_backend):
|
||||||
self.id = job_id
|
self.id = job_id
|
||||||
self.name = name
|
self.name = name
|
||||||
self.log_uri = log_uri
|
self.log_uri = log_uri
|
||||||
@ -103,6 +105,8 @@ class FakeJobFlow(object):
|
|||||||
|
|
||||||
self.instance_group_ids = []
|
self.instance_group_ids = []
|
||||||
|
|
||||||
|
self.emr_backend = emr_backend
|
||||||
|
|
||||||
def create_cluster(self):
|
def create_cluster(self):
|
||||||
cluster = Cluster(
|
cluster = Cluster(
|
||||||
id=self.id,
|
id=self.id,
|
||||||
@ -124,7 +128,6 @@ class FakeJobFlow(object):
|
|||||||
else:
|
else:
|
||||||
self.visible_to_all_users = False
|
self.visible_to_all_users = False
|
||||||
|
|
||||||
|
|
||||||
def set_termination_protection(self, value):
|
def set_termination_protection(self, value):
|
||||||
self.termination_protected = value
|
self.termination_protected = value
|
||||||
|
|
||||||
@ -141,7 +144,7 @@ class FakeJobFlow(object):
|
|||||||
|
|
||||||
@property
|
@property
|
||||||
def instance_groups(self):
|
def instance_groups(self):
|
||||||
return emr_backend.get_instance_groups(self.instance_group_ids)
|
return self.emr_backend.get_instance_groups(self.instance_group_ids)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def master_instance_type(self):
|
def master_instance_type(self):
|
||||||
@ -180,7 +183,8 @@ class ElasticMapReduceBackend(BaseBackend):
|
|||||||
|
|
||||||
def run_job_flow(self, name, log_uri, job_flow_role, visible_to_all_users, steps, instance_attrs):
|
def run_job_flow(self, name, log_uri, job_flow_role, visible_to_all_users, steps, instance_attrs):
|
||||||
job_id = random_job_id()
|
job_id = random_job_id()
|
||||||
job_flow = FakeJobFlow(job_id, name, log_uri, job_flow_role, visible_to_all_users, steps, instance_attrs)
|
job_flow = FakeJobFlow(
|
||||||
|
job_id, name, log_uri, job_flow_role, visible_to_all_users, steps, instance_attrs, self)
|
||||||
self.job_flows[job_id] = job_flow
|
self.job_flows[job_id] = job_flow
|
||||||
cluster = job_flow.create_cluster()
|
cluster = job_flow.create_cluster()
|
||||||
self.clusters[cluster.id] = cluster
|
self.clusters[cluster.id] = cluster
|
||||||
@ -254,4 +258,6 @@ class ElasticMapReduceBackend(BaseBackend):
|
|||||||
cluster.remove_tags(tag_keys)
|
cluster.remove_tags(tag_keys)
|
||||||
|
|
||||||
|
|
||||||
emr_backend = ElasticMapReduceBackend()
|
emr_backends = {}
|
||||||
|
for region in boto.emr.regions():
|
||||||
|
emr_backends[region.name] = ElasticMapReduceBackend()
|
||||||
|
@ -1,17 +1,21 @@
|
|||||||
from __future__ import unicode_literals
|
from __future__ import unicode_literals
|
||||||
|
|
||||||
from moto.core.responses import BaseResponse
|
from moto.core.responses import BaseResponse
|
||||||
from .models import emr_backend
|
from .models import emr_backends
|
||||||
from .utils import tags_from_query_string
|
from .utils import tags_from_query_string
|
||||||
|
|
||||||
|
|
||||||
class ElasticMapReduceResponse(BaseResponse):
|
class ElasticMapReduceResponse(BaseResponse):
|
||||||
|
|
||||||
|
@property
|
||||||
|
def backend(self):
|
||||||
|
return emr_backends[self.region]
|
||||||
|
|
||||||
def add_job_flow_steps(self):
|
def add_job_flow_steps(self):
|
||||||
job_flow_id = self._get_param('JobFlowId')
|
job_flow_id = self._get_param('JobFlowId')
|
||||||
steps = self._get_list_prefix('Steps.member')
|
steps = self._get_list_prefix('Steps.member')
|
||||||
|
|
||||||
job_flow = emr_backend.add_job_flow_steps(job_flow_id, steps)
|
job_flow = self.backend.add_job_flow_steps(job_flow_id, steps)
|
||||||
template = self.response_template(ADD_JOB_FLOW_STEPS_TEMPLATE)
|
template = self.response_template(ADD_JOB_FLOW_STEPS_TEMPLATE)
|
||||||
return template.render(job_flow=job_flow)
|
return template.render(job_flow=job_flow)
|
||||||
|
|
||||||
@ -23,78 +27,78 @@ class ElasticMapReduceResponse(BaseResponse):
|
|||||||
job_flow_role = self._get_param('JobFlowRole')
|
job_flow_role = self._get_param('JobFlowRole')
|
||||||
visible_to_all_users = self._get_param('VisibleToAllUsers')
|
visible_to_all_users = self._get_param('VisibleToAllUsers')
|
||||||
|
|
||||||
job_flow = emr_backend.run_job_flow(
|
job_flow = self.backend.run_job_flow(
|
||||||
flow_name, log_uri, job_flow_role,
|
flow_name, log_uri, job_flow_role,
|
||||||
visible_to_all_users, steps, instance_attrs
|
visible_to_all_users, steps, instance_attrs
|
||||||
)
|
)
|
||||||
instance_groups = self._get_list_prefix('Instances.InstanceGroups.member')
|
instance_groups = self._get_list_prefix('Instances.InstanceGroups.member')
|
||||||
if instance_groups:
|
if instance_groups:
|
||||||
emr_backend.add_instance_groups(job_flow.id, instance_groups)
|
self.backend.add_instance_groups(job_flow.id, instance_groups)
|
||||||
|
|
||||||
template = self.response_template(RUN_JOB_FLOW_TEMPLATE)
|
template = self.response_template(RUN_JOB_FLOW_TEMPLATE)
|
||||||
return template.render(job_flow=job_flow)
|
return template.render(job_flow=job_flow)
|
||||||
|
|
||||||
def describe_job_flows(self):
|
def describe_job_flows(self):
|
||||||
job_flow_ids = self._get_multi_param("JobFlowIds.member")
|
job_flow_ids = self._get_multi_param("JobFlowIds.member")
|
||||||
job_flows = emr_backend.describe_job_flows(job_flow_ids)
|
job_flows = self.backend.describe_job_flows(job_flow_ids)
|
||||||
template = self.response_template(DESCRIBE_JOB_FLOWS_TEMPLATE)
|
template = self.response_template(DESCRIBE_JOB_FLOWS_TEMPLATE)
|
||||||
return template.render(job_flows=job_flows)
|
return template.render(job_flows=job_flows)
|
||||||
|
|
||||||
def terminate_job_flows(self):
|
def terminate_job_flows(self):
|
||||||
job_ids = self._get_multi_param('JobFlowIds.member.')
|
job_ids = self._get_multi_param('JobFlowIds.member.')
|
||||||
job_flows = emr_backend.terminate_job_flows(job_ids)
|
job_flows = self.backend.terminate_job_flows(job_ids)
|
||||||
template = self.response_template(TERMINATE_JOB_FLOWS_TEMPLATE)
|
template = self.response_template(TERMINATE_JOB_FLOWS_TEMPLATE)
|
||||||
return template.render(job_flows=job_flows)
|
return template.render(job_flows=job_flows)
|
||||||
|
|
||||||
def add_instance_groups(self):
|
def add_instance_groups(self):
|
||||||
jobflow_id = self._get_param('JobFlowId')
|
jobflow_id = self._get_param('JobFlowId')
|
||||||
instance_groups = self._get_list_prefix('InstanceGroups.member')
|
instance_groups = self._get_list_prefix('InstanceGroups.member')
|
||||||
instance_groups = emr_backend.add_instance_groups(jobflow_id, instance_groups)
|
instance_groups = self.backend.add_instance_groups(jobflow_id, instance_groups)
|
||||||
template = self.response_template(ADD_INSTANCE_GROUPS_TEMPLATE)
|
template = self.response_template(ADD_INSTANCE_GROUPS_TEMPLATE)
|
||||||
return template.render(instance_groups=instance_groups)
|
return template.render(instance_groups=instance_groups)
|
||||||
|
|
||||||
def modify_instance_groups(self):
|
def modify_instance_groups(self):
|
||||||
instance_groups = self._get_list_prefix('InstanceGroups.member')
|
instance_groups = self._get_list_prefix('InstanceGroups.member')
|
||||||
instance_groups = emr_backend.modify_instance_groups(instance_groups)
|
instance_groups = self.backend.modify_instance_groups(instance_groups)
|
||||||
template = self.response_template(MODIFY_INSTANCE_GROUPS_TEMPLATE)
|
template = self.response_template(MODIFY_INSTANCE_GROUPS_TEMPLATE)
|
||||||
return template.render(instance_groups=instance_groups)
|
return template.render(instance_groups=instance_groups)
|
||||||
|
|
||||||
def set_visible_to_all_users(self):
|
def set_visible_to_all_users(self):
|
||||||
visible_to_all_users = self._get_param('VisibleToAllUsers')
|
visible_to_all_users = self._get_param('VisibleToAllUsers')
|
||||||
job_ids = self._get_multi_param('JobFlowIds.member')
|
job_ids = self._get_multi_param('JobFlowIds.member')
|
||||||
emr_backend.set_visible_to_all_users(job_ids, visible_to_all_users)
|
self.backend.set_visible_to_all_users(job_ids, visible_to_all_users)
|
||||||
template = self.response_template(SET_VISIBLE_TO_ALL_USERS_TEMPLATE)
|
template = self.response_template(SET_VISIBLE_TO_ALL_USERS_TEMPLATE)
|
||||||
return template.render()
|
return template.render()
|
||||||
|
|
||||||
def set_termination_protection(self):
|
def set_termination_protection(self):
|
||||||
termination_protection = self._get_param('TerminationProtected')
|
termination_protection = self._get_param('TerminationProtected')
|
||||||
job_ids = self._get_multi_param('JobFlowIds.member')
|
job_ids = self._get_multi_param('JobFlowIds.member')
|
||||||
emr_backend.set_termination_protection(job_ids, termination_protection)
|
self.backend.set_termination_protection(job_ids, termination_protection)
|
||||||
template = self.response_template(SET_TERMINATION_PROTECTION_TEMPLATE)
|
template = self.response_template(SET_TERMINATION_PROTECTION_TEMPLATE)
|
||||||
return template.render()
|
return template.render()
|
||||||
|
|
||||||
def list_clusters(self):
|
def list_clusters(self):
|
||||||
clusters = emr_backend.list_clusters()
|
clusters = self.backend.list_clusters()
|
||||||
template = self.response_template(LIST_CLUSTERS_TEMPLATE)
|
template = self.response_template(LIST_CLUSTERS_TEMPLATE)
|
||||||
return template.render(clusters=clusters)
|
return template.render(clusters=clusters)
|
||||||
|
|
||||||
def describe_cluster(self):
|
def describe_cluster(self):
|
||||||
cluster_id = self._get_param('ClusterId')
|
cluster_id = self._get_param('ClusterId')
|
||||||
cluster = emr_backend.get_cluster(cluster_id)
|
cluster = self.backend.get_cluster(cluster_id)
|
||||||
template = self.response_template(DESCRIBE_CLUSTER_TEMPLATE)
|
template = self.response_template(DESCRIBE_CLUSTER_TEMPLATE)
|
||||||
return template.render(cluster=cluster)
|
return template.render(cluster=cluster)
|
||||||
|
|
||||||
def add_tags(self):
|
def add_tags(self):
|
||||||
cluster_id = self._get_param('ResourceId')
|
cluster_id = self._get_param('ResourceId')
|
||||||
tags = tags_from_query_string(self.querystring)
|
tags = tags_from_query_string(self.querystring)
|
||||||
emr_backend.add_tags(cluster_id, tags)
|
self.backend.add_tags(cluster_id, tags)
|
||||||
template = self.response_template(ADD_TAGS_TEMPLATE)
|
template = self.response_template(ADD_TAGS_TEMPLATE)
|
||||||
return template.render()
|
return template.render()
|
||||||
|
|
||||||
def remove_tags(self):
|
def remove_tags(self):
|
||||||
cluster_id = self._get_param('ResourceId')
|
cluster_id = self._get_param('ResourceId')
|
||||||
tag_keys = self._get_multi_param('TagKeys.member')
|
tag_keys = self._get_multi_param('TagKeys.member')
|
||||||
emr_backend.remove_tags(cluster_id, tag_keys)
|
self.backend.remove_tags(cluster_id, tag_keys)
|
||||||
template = self.response_template(REMOVE_TAGS_TEMPLATE)
|
template = self.response_template(REMOVE_TAGS_TEMPLATE)
|
||||||
return template.render()
|
return template.render()
|
||||||
|
|
||||||
|
@ -10,6 +10,40 @@ from moto import mock_emr
|
|||||||
from tests.helpers import requires_boto_gte
|
from tests.helpers import requires_boto_gte
|
||||||
|
|
||||||
|
|
||||||
|
@mock_emr
|
||||||
|
def test_create_job_flow_in_multiple_regions():
|
||||||
|
step = StreamingStep(
|
||||||
|
name='My wordcount example',
|
||||||
|
mapper='s3n://elasticmapreduce/samples/wordcount/wordSplitter.py',
|
||||||
|
reducer='aggregate',
|
||||||
|
input='s3n://elasticmapreduce/samples/wordcount/input',
|
||||||
|
output='s3n://output_bucket/output/wordcount_output'
|
||||||
|
)
|
||||||
|
|
||||||
|
west1_conn = boto.emr.connect_to_region('us-east-1')
|
||||||
|
west1_job_id = west1_conn.run_jobflow(
|
||||||
|
name='us-east-1',
|
||||||
|
log_uri='s3://some_bucket/jobflow_logs',
|
||||||
|
master_instance_type='m1.medium',
|
||||||
|
slave_instance_type='m1.small',
|
||||||
|
steps=[step],
|
||||||
|
)
|
||||||
|
|
||||||
|
west2_conn = boto.emr.connect_to_region('eu-west-1')
|
||||||
|
west2_job_id = west2_conn.run_jobflow(
|
||||||
|
name='eu-west-1',
|
||||||
|
log_uri='s3://some_bucket/jobflow_logs',
|
||||||
|
master_instance_type='m1.medium',
|
||||||
|
slave_instance_type='m1.small',
|
||||||
|
steps=[step],
|
||||||
|
)
|
||||||
|
|
||||||
|
west1_job_flow = west1_conn.describe_jobflow(west1_job_id)
|
||||||
|
west1_job_flow.name.should.equal('us-east-1')
|
||||||
|
west2_job_flow = west2_conn.describe_jobflow(west2_job_id)
|
||||||
|
west2_job_flow.name.should.equal('eu-west-1')
|
||||||
|
|
||||||
|
|
||||||
@mock_emr
|
@mock_emr
|
||||||
def test_create_job_flow():
|
def test_create_job_flow():
|
||||||
conn = boto.connect_emr()
|
conn = boto.connect_emr()
|
||||||
@ -129,7 +163,6 @@ def test_create_job_flow_with_instance_groups():
|
|||||||
int(instance_group.instancerunningcount).should.equal(6)
|
int(instance_group.instancerunningcount).should.equal(6)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@mock_emr
|
@mock_emr
|
||||||
def test_terminate_job_flow():
|
def test_terminate_job_flow():
|
||||||
conn = boto.connect_emr()
|
conn = boto.connect_emr()
|
||||||
|
Loading…
Reference in New Issue
Block a user