diff --git a/moto/emr/__init__.py b/moto/emr/__init__.py index ad7c75964..f0103319a 100644 --- a/moto/emr/__init__.py +++ b/moto/emr/__init__.py @@ -1,3 +1,12 @@ from __future__ import unicode_literals -from .models import emr_backend -mock_emr = emr_backend.decorator +from .models import emr_backends +from ..core.models import MockAWS + +emr_backend = emr_backends['us-east-1'] + + +def mock_emr(func=None): + if func: + return MockAWS(emr_backends)(func) + else: + return MockAWS(emr_backends) diff --git a/moto/emr/models.py b/moto/emr/models.py index 5193a1986..353c968ec 100644 --- a/moto/emr/models.py +++ b/moto/emr/models.py @@ -1,7 +1,9 @@ from __future__ import unicode_literals + +import boto.emr from moto.core import BaseBackend -from .utils import random_job_id, random_instance_group_id +from .utils import random_instance_group_id, random_job_id DEFAULT_JOB_FLOW_ROLE = 'EMRJobflowDefault' @@ -80,7 +82,7 @@ class FakeStep(object): class FakeJobFlow(object): - def __init__(self, job_id, name, log_uri, job_flow_role, visible_to_all_users, steps, instance_attrs): + def __init__(self, job_id, name, log_uri, job_flow_role, visible_to_all_users, steps, instance_attrs, emr_backend): self.id = job_id self.name = name self.log_uri = log_uri @@ -103,6 +105,8 @@ class FakeJobFlow(object): self.instance_group_ids = [] + self.emr_backend = emr_backend + def create_cluster(self): cluster = Cluster( id=self.id, @@ -124,7 +128,6 @@ class FakeJobFlow(object): else: self.visible_to_all_users = False - def set_termination_protection(self, value): self.termination_protected = value @@ -141,7 +144,7 @@ class FakeJobFlow(object): @property def instance_groups(self): - return emr_backend.get_instance_groups(self.instance_group_ids) + return self.emr_backend.get_instance_groups(self.instance_group_ids) @property def master_instance_type(self): @@ -180,7 +183,8 @@ class ElasticMapReduceBackend(BaseBackend): def run_job_flow(self, name, log_uri, job_flow_role, visible_to_all_users, steps, instance_attrs): job_id = random_job_id() - job_flow = FakeJobFlow(job_id, name, log_uri, job_flow_role, visible_to_all_users, steps, instance_attrs) + job_flow = FakeJobFlow( + job_id, name, log_uri, job_flow_role, visible_to_all_users, steps, instance_attrs, self) self.job_flows[job_id] = job_flow cluster = job_flow.create_cluster() self.clusters[cluster.id] = cluster @@ -254,4 +258,6 @@ class ElasticMapReduceBackend(BaseBackend): cluster.remove_tags(tag_keys) -emr_backend = ElasticMapReduceBackend() +emr_backends = {} +for region in boto.emr.regions(): + emr_backends[region.name] = ElasticMapReduceBackend() diff --git a/moto/emr/responses.py b/moto/emr/responses.py index 926704c10..f7b149345 100644 --- a/moto/emr/responses.py +++ b/moto/emr/responses.py @@ -1,17 +1,21 @@ from __future__ import unicode_literals from moto.core.responses import BaseResponse -from .models import emr_backend +from .models import emr_backends from .utils import tags_from_query_string class ElasticMapReduceResponse(BaseResponse): + @property + def backend(self): + return emr_backends[self.region] + def add_job_flow_steps(self): job_flow_id = self._get_param('JobFlowId') steps = self._get_list_prefix('Steps.member') - job_flow = emr_backend.add_job_flow_steps(job_flow_id, steps) + job_flow = self.backend.add_job_flow_steps(job_flow_id, steps) template = self.response_template(ADD_JOB_FLOW_STEPS_TEMPLATE) return template.render(job_flow=job_flow) @@ -23,78 +27,78 @@ class ElasticMapReduceResponse(BaseResponse): job_flow_role = self._get_param('JobFlowRole') visible_to_all_users = self._get_param('VisibleToAllUsers') - job_flow = emr_backend.run_job_flow( + job_flow = self.backend.run_job_flow( flow_name, log_uri, job_flow_role, visible_to_all_users, steps, instance_attrs ) instance_groups = self._get_list_prefix('Instances.InstanceGroups.member') if instance_groups: - emr_backend.add_instance_groups(job_flow.id, instance_groups) + self.backend.add_instance_groups(job_flow.id, instance_groups) template = self.response_template(RUN_JOB_FLOW_TEMPLATE) return template.render(job_flow=job_flow) def describe_job_flows(self): job_flow_ids = self._get_multi_param("JobFlowIds.member") - job_flows = emr_backend.describe_job_flows(job_flow_ids) + job_flows = self.backend.describe_job_flows(job_flow_ids) template = self.response_template(DESCRIBE_JOB_FLOWS_TEMPLATE) return template.render(job_flows=job_flows) def terminate_job_flows(self): job_ids = self._get_multi_param('JobFlowIds.member.') - job_flows = emr_backend.terminate_job_flows(job_ids) + job_flows = self.backend.terminate_job_flows(job_ids) template = self.response_template(TERMINATE_JOB_FLOWS_TEMPLATE) return template.render(job_flows=job_flows) def add_instance_groups(self): jobflow_id = self._get_param('JobFlowId') instance_groups = self._get_list_prefix('InstanceGroups.member') - instance_groups = emr_backend.add_instance_groups(jobflow_id, instance_groups) + instance_groups = self.backend.add_instance_groups(jobflow_id, instance_groups) template = self.response_template(ADD_INSTANCE_GROUPS_TEMPLATE) return template.render(instance_groups=instance_groups) def modify_instance_groups(self): instance_groups = self._get_list_prefix('InstanceGroups.member') - instance_groups = emr_backend.modify_instance_groups(instance_groups) + instance_groups = self.backend.modify_instance_groups(instance_groups) template = self.response_template(MODIFY_INSTANCE_GROUPS_TEMPLATE) return template.render(instance_groups=instance_groups) def set_visible_to_all_users(self): visible_to_all_users = self._get_param('VisibleToAllUsers') job_ids = self._get_multi_param('JobFlowIds.member') - emr_backend.set_visible_to_all_users(job_ids, visible_to_all_users) + self.backend.set_visible_to_all_users(job_ids, visible_to_all_users) template = self.response_template(SET_VISIBLE_TO_ALL_USERS_TEMPLATE) return template.render() def set_termination_protection(self): termination_protection = self._get_param('TerminationProtected') job_ids = self._get_multi_param('JobFlowIds.member') - emr_backend.set_termination_protection(job_ids, termination_protection) + self.backend.set_termination_protection(job_ids, termination_protection) template = self.response_template(SET_TERMINATION_PROTECTION_TEMPLATE) return template.render() def list_clusters(self): - clusters = emr_backend.list_clusters() + clusters = self.backend.list_clusters() template = self.response_template(LIST_CLUSTERS_TEMPLATE) return template.render(clusters=clusters) def describe_cluster(self): cluster_id = self._get_param('ClusterId') - cluster = emr_backend.get_cluster(cluster_id) + cluster = self.backend.get_cluster(cluster_id) template = self.response_template(DESCRIBE_CLUSTER_TEMPLATE) return template.render(cluster=cluster) def add_tags(self): cluster_id = self._get_param('ResourceId') tags = tags_from_query_string(self.querystring) - emr_backend.add_tags(cluster_id, tags) + self.backend.add_tags(cluster_id, tags) template = self.response_template(ADD_TAGS_TEMPLATE) return template.render() def remove_tags(self): cluster_id = self._get_param('ResourceId') tag_keys = self._get_multi_param('TagKeys.member') - emr_backend.remove_tags(cluster_id, tag_keys) + self.backend.remove_tags(cluster_id, tag_keys) template = self.response_template(REMOVE_TAGS_TEMPLATE) return template.render() diff --git a/tests/test_emr/test_emr.py b/tests/test_emr/test_emr.py index 6667f04db..61090054a 100644 --- a/tests/test_emr/test_emr.py +++ b/tests/test_emr/test_emr.py @@ -10,6 +10,40 @@ from moto import mock_emr from tests.helpers import requires_boto_gte +@mock_emr +def test_create_job_flow_in_multiple_regions(): + step = StreamingStep( + name='My wordcount example', + mapper='s3n://elasticmapreduce/samples/wordcount/wordSplitter.py', + reducer='aggregate', + input='s3n://elasticmapreduce/samples/wordcount/input', + output='s3n://output_bucket/output/wordcount_output' + ) + + west1_conn = boto.emr.connect_to_region('us-east-1') + west1_job_id = west1_conn.run_jobflow( + name='us-east-1', + log_uri='s3://some_bucket/jobflow_logs', + master_instance_type='m1.medium', + slave_instance_type='m1.small', + steps=[step], + ) + + west2_conn = boto.emr.connect_to_region('eu-west-1') + west2_job_id = west2_conn.run_jobflow( + name='eu-west-1', + log_uri='s3://some_bucket/jobflow_logs', + master_instance_type='m1.medium', + slave_instance_type='m1.small', + steps=[step], + ) + + west1_job_flow = west1_conn.describe_jobflow(west1_job_id) + west1_job_flow.name.should.equal('us-east-1') + west2_job_flow = west2_conn.describe_jobflow(west2_job_id) + west2_job_flow.name.should.equal('eu-west-1') + + @mock_emr def test_create_job_flow(): conn = boto.connect_emr() @@ -129,7 +163,6 @@ def test_create_job_flow_with_instance_groups(): int(instance_group.instancerunningcount).should.equal(6) - @mock_emr def test_terminate_job_flow(): conn = boto.connect_emr()