From cea25e75c5b5959e0707c5ec44d3764d30af00bf Mon Sep 17 00:00:00 2001 From: Steve Pulec Date: Wed, 7 Aug 2013 20:32:29 -0400 Subject: [PATCH] basic emr done --- moto/__init__.py | 1 + moto/emr/__init__.py | 2 + moto/emr/models.py | 167 ++++++++++++++++++++++++ moto/emr/responses.py | 192 ++++++++++++++++++++++++++++ moto/emr/urls.py | 9 ++ moto/emr/utils.py | 14 ++ tests/test_emr/test_emr.py | 233 ++++++++++++++++++++++++++++++++++ tests/test_emr/test_server.py | 16 +++ 8 files changed, 634 insertions(+) create mode 100644 moto/emr/__init__.py create mode 100644 moto/emr/models.py create mode 100644 moto/emr/responses.py create mode 100644 moto/emr/urls.py create mode 100644 moto/emr/utils.py create mode 100644 tests/test_emr/test_emr.py create mode 100644 tests/test_emr/test_server.py diff --git a/moto/__init__.py b/moto/__init__.py index 8113260a7..57e8eef38 100644 --- a/moto/__init__.py +++ b/moto/__init__.py @@ -5,6 +5,7 @@ from .autoscaling import mock_autoscaling from .dynamodb import mock_dynamodb from .ec2 import mock_ec2 from .elb import mock_elb +from .emr import mock_emr from .s3 import mock_s3 from .ses import mock_ses from .sqs import mock_sqs diff --git a/moto/emr/__init__.py b/moto/emr/__init__.py new file mode 100644 index 000000000..7d4de9a5f --- /dev/null +++ b/moto/emr/__init__.py @@ -0,0 +1,2 @@ +from .models import emr_backend +mock_emr = emr_backend.decorator diff --git a/moto/emr/models.py b/moto/emr/models.py new file mode 100644 index 000000000..af2a4b45f --- /dev/null +++ b/moto/emr/models.py @@ -0,0 +1,167 @@ +from moto.core import BaseBackend + +from .utils import random_job_id, random_instance_group_id + + +class FakeInstanceGroup(object): + def __init__(self, id, instance_count, instance_role, instance_type, market, name, bid_price=None): + self.id = id + self.num_instances = instance_count + self.role = instance_role + self.type = instance_type + self.market = market + self.name = name + self.bid_price = bid_price + + def set_instance_count(self, instance_count): + self.num_instances = instance_count + + +class FakeStep(object): + def __init__(self, state, **kwargs): + # 'Steps.member.1.HadoopJarStep.Jar': ['/home/hadoop/contrib/streaming/hadoop-streaming.jar'], + # 'Steps.member.1.HadoopJarStep.Args.member.1': ['-mapper'], + # 'Steps.member.1.HadoopJarStep.Args.member.2': ['s3n://elasticmapreduce/samples/wordcount/wordSplitter.py'], + # 'Steps.member.1.HadoopJarStep.Args.member.3': ['-reducer'], + # 'Steps.member.1.HadoopJarStep.Args.member.4': ['aggregate'], + # 'Steps.member.1.HadoopJarStep.Args.member.5': ['-input'], + # 'Steps.member.1.HadoopJarStep.Args.member.6': ['s3n://elasticmapreduce/samples/wordcount/input'], + # 'Steps.member.1.HadoopJarStep.Args.member.7': ['-output'], + # 'Steps.member.1.HadoopJarStep.Args.member.8': ['s3n:///output/wordcount_output'], + # 'Steps.member.1.ActionOnFailure': ['TERMINATE_JOB_FLOW'], + # 'Steps.member.1.Name': ['My wordcount example']} + + self.action_on_failure = kwargs['action_on_failure'] + self.name = kwargs['name'] + self.jar = kwargs['hadoop_jar_step._jar'] + self.args = [] + self.state = state + + arg_index = 1 + while True: + arg = kwargs.get('hadoop_jar_step._args.member.{}'.format(arg_index)) + if arg: + self.args.append(arg) + arg_index += 1 + else: + break + + +class FakeJobFlow(object): + def __init__(self, job_id, name, log_uri, steps, instance_attrs): + self.id = job_id + self.name = name + self.log_uri = log_uri + self.state = "STARTING" + self.steps = [] + self.add_steps(steps) + + self.initial_instance_count = instance_attrs.get('instance_count', 0) + self.initial_master_instance_type = instance_attrs.get('master_instance_type') + self.initial_slave_instance_type = instance_attrs.get('slave_instance_type') + + self.ec2_key_name = instance_attrs.get('ec2_key_name') + self.availability_zone = instance_attrs.get('placement.availability_zone') + self.keep_job_flow_alive_when_no_steps = instance_attrs.get('keep_job_flow_alive_when_no_steps') + self.termination_protected = instance_attrs.get('termination_protected') + + self.instance_group_ids = [] + + def terminate(self): + self.state = 'TERMINATED' + + def add_steps(self, steps): + for index, step in enumerate(steps): + if self.steps: + # If we already have other steps, this one is pending + self.steps.append(FakeStep(state='PENDING', **step)) + else: + self.steps.append(FakeStep(state='STARTING', **step)) + + def add_instance_group(self, instance_group_id): + self.instance_group_ids.append(instance_group_id) + + @property + def instance_groups(self): + return emr_backend.get_instance_groups(self.instance_group_ids) + + @property + def master_instance_type(self): + groups = self.instance_groups + if groups: + groups[0].type + else: + return self.initial_master_instance_type + + @property + def slave_instance_type(self): + groups = self.instance_groups + if groups: + groups[0].type + else: + return self.initial_slave_instance_type + + @property + def instance_count(self): + groups = self.instance_groups + if not groups: + # No groups,return initial instance count + return self.initial_instance_count + count = 0 + for group in groups: + count += int(group.num_instances) + return count + + +class ElasticMapReduceBackend(BaseBackend): + + def __init__(self): + self.job_flows = {} + self.instance_groups = {} + + def run_job_flow(self, name, log_uri, steps, instance_attrs): + job_id = random_job_id() + job_flow = FakeJobFlow(job_id, name, log_uri, steps, instance_attrs) + self.job_flows[job_id] = job_flow + return job_flow + + def add_job_flow_steps(self, job_flow_id, steps): + job_flow = self.job_flows[job_flow_id] + job_flow.add_steps(steps) + return job_flow + + def describe_job_flows(self): + return self.job_flows.values() + + def terminate_job_flows(self, job_ids): + flows = [flow for flow in self.describe_job_flows() if flow.id in job_ids] + for flow in flows: + flow.terminate() + return flows + + def get_instance_groups(self, instance_group_ids): + return [ + group for group_id, group + in self.instance_groups.items() + if group_id in instance_group_ids + ] + + def add_instance_groups(self, job_flow_id, instance_groups): + job_flow = self.job_flows[job_flow_id] + result_groups = [] + for instance_group in instance_groups: + instance_group_id = random_instance_group_id() + group = FakeInstanceGroup(instance_group_id, **instance_group) + self.instance_groups[instance_group_id] = group + job_flow.add_instance_group(instance_group_id) + result_groups.append(group) + return result_groups + + def modify_instance_groups(self, instance_groups): + result_groups = [] + for instance_group in instance_groups: + group = self.instance_groups[instance_group['instance_group_id']] + group.set_instance_count(instance_group['instance_count']) + return result_groups + +emr_backend = ElasticMapReduceBackend() diff --git a/moto/emr/responses.py b/moto/emr/responses.py new file mode 100644 index 000000000..8886ecbc5 --- /dev/null +++ b/moto/emr/responses.py @@ -0,0 +1,192 @@ +from jinja2 import Template + +from moto.core.responses import BaseResponse +from moto.core.utils import camelcase_to_underscores +from .models import emr_backend + + +class ElasticMapReduceResponse(BaseResponse): + + def _get_param(self, param_name): + return self.querystring.get(param_name, [None])[0] + + def _get_multi_param(self, param_prefix): + return [value[0] for key, value in self.querystring.items() if key.startswith(param_prefix)] + + def _get_dict_param(self, param_prefix): + return { + camelcase_to_underscores(key.replace(param_prefix, "")): value[0] + for key, value + in self.querystring.items() + if key.startswith(param_prefix) + } + + def _get_list_prefix(self, param_prefix): + results = [] + param_index = 1 + while True: + index_prefix = "{}.{}.".format(param_prefix, param_index) + new_items = { + camelcase_to_underscores(key.replace(index_prefix, "")): value[0] + for key, value in self.querystring.items() + if key.startswith(index_prefix) + } + if not new_items: + break + results.append(new_items) + param_index += 1 + return results + + def add_job_flow_steps(self): + job_flow_id = self._get_param('JobFlowId') + steps = self._get_list_prefix('Steps.member') + + job_flow = emr_backend.add_job_flow_steps(job_flow_id, steps) + template = Template(ADD_JOB_FLOW_STEPS_TEMPLATE) + return template.render(job_flow=job_flow) + + def run_job_flow(self): + flow_name = self._get_param('Name') + log_uri = self._get_param('LogUri') + steps = self._get_list_prefix('Steps.member') + instance_attrs = self._get_dict_param('Instances.') + + job_flow = emr_backend.run_job_flow(flow_name, log_uri, steps, instance_attrs) + template = Template(RUN_JOB_FLOW_TEMPLATE) + return template.render(job_flow=job_flow) + + def describe_job_flows(self): + job_flows = emr_backend.describe_job_flows() + template = Template(DESCRIBE_JOB_FLOWS_TEMPLATE) + return template.render(job_flows=job_flows) + + def terminate_job_flows(self): + job_ids = self._get_multi_param('JobFlowIds.member.') + job_flows = emr_backend.terminate_job_flows(job_ids) + template = Template(TERMINATE_JOB_FLOWS_TEMPLATE) + return template.render(job_flows=job_flows) + + def add_instance_groups(self): + jobflow_id = self._get_param('JobFlowId') + instance_groups = self._get_list_prefix('InstanceGroups.member') + instance_groups = emr_backend.add_instance_groups(jobflow_id, instance_groups) + template = Template(ADD_INSTANCE_GROUPS_TEMPLATE) + return template.render(instance_groups=instance_groups) + + def modify_instance_groups(self): + instance_groups = self._get_list_prefix('InstanceGroups.member') + instance_groups = emr_backend.modify_instance_groups(instance_groups) + template = Template(MODIFY_INSTANCE_GROUPS_TEMPLATE) + return template.render(instance_groups=instance_groups) + + +RUN_JOB_FLOW_TEMPLATE = """ + + {{ job_flow.id }} + + + + 8296d8b8-ed85-11dd-9877-6fad448a8419 + + +""" + +DESCRIBE_JOB_FLOWS_TEMPLATE = """ + + + {% for job_flow in job_flows %} + + + 2009-01-28T21:49:16Z + 2009-01-28T21:49:16Z + {{ job_flow.state }} + + {{ job_flow.name }} + {{ job_flow.log_uri }} + + {% for step in job_flow.steps %} + + + 2009-01-28T21:49:16Z + {{ step.state }} + + + + {{ step.jar }} + MyMainClass + + {% for arg in step.args %} + {{ arg }} + {% endfor %} + + + + {{ step.name }} + CONTINUE + + + {% endfor %} + + {{ job_flow.id }} + + + us-east-1a + + {{ job_flow.slave_instance_type }} + {{ job_flow.master_instance_type }} + {{ job_flow.ec2_key_name }} + {{ job_flow.instance_count }} + {{ job_flow.keep_job_flow_alive_when_no_steps }} + {{ job_flow.termination_protected }} + + {% for instance_group in job_flow.instance_groups %} + + {{ instance_group.id }} + {{ instance_group.role }} + {{ instance_group.num_instances }} + {{ instance_group.type }} + {{ instance_group.market }} + {{ instance_group.name }} + {{ instance_group.bid_price }} + + {% endfor %} + + + + {% endfor %} + + + + + 9cea3229-ed85-11dd-9877-6fad448a8419 + + +""" + +TERMINATE_JOB_FLOWS_TEMPLATE = """ + + + 2690d7eb-ed86-11dd-9877-6fad448a8419 + + +""" + +ADD_JOB_FLOW_STEPS_TEMPLATE = """ + + + df6f4f4a-ed85-11dd-9877-6fad448a8419 + + +""" + +ADD_INSTANCE_GROUPS_TEMPLATE = """ + {% for instance_group in instance_groups %}{{ instance_group.id }}{% if loop.index != loop.length %},{% endif %}{% endfor %} +""" + +MODIFY_INSTANCE_GROUPS_TEMPLATE = """ + + + 2690d7eb-ed86-11dd-9877-6fad448a8419 + + +""" diff --git a/moto/emr/urls.py b/moto/emr/urls.py new file mode 100644 index 000000000..8919362f7 --- /dev/null +++ b/moto/emr/urls.py @@ -0,0 +1,9 @@ +from .responses import ElasticMapReduceResponse + +url_bases = [ + "https?://elasticmapreduce.(.+).amazonaws.com", +] + +url_paths = { + '{0}/$': ElasticMapReduceResponse().dispatch, +} diff --git a/moto/emr/utils.py b/moto/emr/utils.py new file mode 100644 index 000000000..4a0d6db0e --- /dev/null +++ b/moto/emr/utils.py @@ -0,0 +1,14 @@ +import random +import string + + +def random_job_id(size=13): + chars = range(10) + list(string.uppercase) + job_tag = ''.join(unicode(random.choice(chars)) for x in range(size)) + return 'j-{}'.format(job_tag) + + +def random_instance_group_id(size=13): + chars = range(10) + list(string.uppercase) + job_tag = ''.join(unicode(random.choice(chars)) for x in range(size)) + return 'i-{}'.format(job_tag) diff --git a/tests/test_emr/test_emr.py b/tests/test_emr/test_emr.py new file mode 100644 index 000000000..add8c1859 --- /dev/null +++ b/tests/test_emr/test_emr.py @@ -0,0 +1,233 @@ +import boto +from boto.emr.instance_group import InstanceGroup +from boto.emr.step import StreamingStep +import sure # noqa + +from moto import mock_emr + + +@mock_emr +def test_create_job_flow(): + conn = boto.connect_emr() + + step1 = StreamingStep( + name='My wordcount example', + mapper='s3n://elasticmapreduce/samples/wordcount/wordSplitter.py', + reducer='aggregate', + input='s3n://elasticmapreduce/samples/wordcount/input', + output='s3n://output_bucket/output/wordcount_output' + ) + + step2 = StreamingStep( + name='My wordcount example2', + mapper='s3n://elasticmapreduce/samples/wordcount/wordSplitter2.py', + reducer='aggregate', + input='s3n://elasticmapreduce/samples/wordcount/input2', + output='s3n://output_bucket/output/wordcount_output2' + ) + + job_id = conn.run_jobflow( + name='My jobflow', + log_uri='s3://some_bucket/jobflow_logs', + master_instance_type='m1.medium', + slave_instance_type='m1.small', + steps=[step1, step2] + ) + + job_flow = conn.describe_jobflow(job_id) + job_flow.state.should.equal('STARTING') + job_flow.jobflowid.should.equal(job_id) + job_flow.name.should.equal('My jobflow') + job_flow.masterinstancetype.should.equal('m1.medium') + job_flow.slaveinstancetype.should.equal('m1.small') + job_flow.loguri.should.equal('s3://some_bucket/jobflow_logs') + job_step = job_flow.steps[0] + job_step.name.should.equal('My wordcount example') + job_step.state.should.equal('STARTING') + args = [arg.value for arg in job_step.args] + args.should.equal([ + '-mapper', + 's3n://elasticmapreduce/samples/wordcount/wordSplitter.py', + '-reducer', + 'aggregate', + '-input', + 's3n://elasticmapreduce/samples/wordcount/input', + '-output', + 's3n://output_bucket/output/wordcount_output', + ]) + + job_step2 = job_flow.steps[1] + job_step2.name.should.equal('My wordcount example2') + job_step2.state.should.equal('PENDING') + args = [arg.value for arg in job_step2.args] + args.should.equal([ + '-mapper', + 's3n://elasticmapreduce/samples/wordcount/wordSplitter2.py', + '-reducer', + 'aggregate', + '-input', + 's3n://elasticmapreduce/samples/wordcount/input2', + '-output', + 's3n://output_bucket/output/wordcount_output2', + ]) + + +@mock_emr +def test_terminate_job_flow(): + conn = boto.connect_emr() + job_id = conn.run_jobflow( + name='My jobflow', + log_uri='s3://some_bucket/jobflow_logs', + steps=[] + ) + + flow = conn.describe_jobflows()[0] + flow.state.should.equal('STARTING') + conn.terminate_jobflow(job_id) + flow = conn.describe_jobflows()[0] + flow.state.should.equal('TERMINATED') + + +@mock_emr +def test_add_steps_to_flow(): + conn = boto.connect_emr() + + step1 = StreamingStep( + name='My wordcount example', + mapper='s3n://elasticmapreduce/samples/wordcount/wordSplitter.py', + reducer='aggregate', + input='s3n://elasticmapreduce/samples/wordcount/input', + output='s3n://output_bucket/output/wordcount_output' + ) + + job_id = conn.run_jobflow( + name='My jobflow', + log_uri='s3://some_bucket/jobflow_logs', + steps=[step1] + ) + + job_flow = conn.describe_jobflow(job_id) + job_flow.state.should.equal('STARTING') + job_flow.jobflowid.should.equal(job_id) + job_flow.name.should.equal('My jobflow') + job_flow.loguri.should.equal('s3://some_bucket/jobflow_logs') + + step2 = StreamingStep( + name='My wordcount example2', + mapper='s3n://elasticmapreduce/samples/wordcount/wordSplitter2.py', + reducer='aggregate', + input='s3n://elasticmapreduce/samples/wordcount/input2', + output='s3n://output_bucket/output/wordcount_output2' + ) + + conn.add_jobflow_steps(job_id, [step2]) + + job_flow = conn.describe_jobflow(job_id) + job_step = job_flow.steps[0] + job_step.name.should.equal('My wordcount example') + job_step.state.should.equal('STARTING') + args = [arg.value for arg in job_step.args] + args.should.equal([ + '-mapper', + 's3n://elasticmapreduce/samples/wordcount/wordSplitter.py', + '-reducer', + 'aggregate', + '-input', + 's3n://elasticmapreduce/samples/wordcount/input', + '-output', + 's3n://output_bucket/output/wordcount_output', + ]) + + job_step2 = job_flow.steps[1] + job_step2.name.should.equal('My wordcount example2') + job_step2.state.should.equal('PENDING') + args = [arg.value for arg in job_step2.args] + args.should.equal([ + '-mapper', + 's3n://elasticmapreduce/samples/wordcount/wordSplitter2.py', + '-reducer', + 'aggregate', + '-input', + 's3n://elasticmapreduce/samples/wordcount/input2', + '-output', + 's3n://output_bucket/output/wordcount_output2', + ]) + + +@mock_emr +def test_create_instance_groups(): + conn = boto.connect_emr() + + step1 = StreamingStep( + name='My wordcount example', + mapper='s3n://elasticmapreduce/samples/wordcount/wordSplitter.py', + reducer='aggregate', + input='s3n://elasticmapreduce/samples/wordcount/input', + output='s3n://output_bucket/output/wordcount_output' + ) + + job_id = conn.run_jobflow( + name='My jobflow', + log_uri='s3://some_bucket/jobflow_logs', + steps=[step1] + ) + + instance_group = InstanceGroup(6, 'TASK', 'c1.medium', 'SPOT', 'spot-0.07', '0.07') + instance_group = conn.add_instance_groups(job_id, [instance_group]) + instance_group_id = instance_group.instancegroupids + job_flow = conn.describe_jobflows()[0] + int(job_flow.instancecount).should.equal(6) + instance_group = job_flow.instancegroups[0] + instance_group.instancegroupid.should.equal(instance_group_id) + int(instance_group.instancerunningcount).should.equal(6) + instance_group.instancerole.should.equal('TASK') + instance_group.instancetype.should.equal('c1.medium') + instance_group.market.should.equal('SPOT') + instance_group.name.should.equal('spot-0.07') + instance_group.bidprice.should.equal('0.07') + + +@mock_emr +def test_modify_instance_groups(): + conn = boto.connect_emr() + + step1 = StreamingStep( + name='My wordcount example', + mapper='s3n://elasticmapreduce/samples/wordcount/wordSplitter.py', + reducer='aggregate', + input='s3n://elasticmapreduce/samples/wordcount/input', + output='s3n://output_bucket/output/wordcount_output' + ) + + job_id = conn.run_jobflow( + name='My jobflow', + log_uri='s3://some_bucket/jobflow_logs', + steps=[step1] + ) + + instance_group1 = InstanceGroup(6, 'TASK', 'c1.medium', 'SPOT', 'spot-0.07', '0.07') + instance_group2 = InstanceGroup(6, 'TASK', 'c1.medium', 'SPOT', 'spot-0.07', '0.07') + instance_group = conn.add_instance_groups(job_id, [instance_group1, instance_group2]) + instance_group_ids = instance_group.instancegroupids.split(",") + + job_flow = conn.describe_jobflows()[0] + int(job_flow.instancecount).should.equal(12) + instance_group = job_flow.instancegroups[0] + int(instance_group.instancerunningcount).should.equal(6) + + conn.modify_instance_groups(instance_group_ids, [2, 3]) + + job_flow = conn.describe_jobflows()[0] + int(job_flow.instancecount).should.equal(5) + instance_group1 = [ + group for group + in job_flow.instancegroups + if group.instancegroupid == instance_group_ids[0] + ][0] + int(instance_group1.instancerunningcount).should.equal(2) + instance_group2 = [ + group for group + in job_flow.instancegroups + if group.instancegroupid == instance_group_ids[1] + ][0] + int(instance_group2.instancerunningcount).should.equal(3) diff --git a/tests/test_emr/test_server.py b/tests/test_emr/test_server.py new file mode 100644 index 000000000..85ba7c4db --- /dev/null +++ b/tests/test_emr/test_server.py @@ -0,0 +1,16 @@ +import sure # noqa + +import moto.server as server + +''' +Test the different server responses +''' +server.configure_urls("emr") + + +def test_describe_jobflows(): + test_client = server.app.test_client() + res = test_client.get('/?Action=DescribeJobFlows') + + res.data.should.contain('') + res.data.should.contain('')