basic emr done
This commit is contained in:
parent
02fa630a3c
commit
cea25e75c5
@ -5,6 +5,7 @@ from .autoscaling import mock_autoscaling
|
|||||||
from .dynamodb import mock_dynamodb
|
from .dynamodb import mock_dynamodb
|
||||||
from .ec2 import mock_ec2
|
from .ec2 import mock_ec2
|
||||||
from .elb import mock_elb
|
from .elb import mock_elb
|
||||||
|
from .emr import mock_emr
|
||||||
from .s3 import mock_s3
|
from .s3 import mock_s3
|
||||||
from .ses import mock_ses
|
from .ses import mock_ses
|
||||||
from .sqs import mock_sqs
|
from .sqs import mock_sqs
|
||||||
|
2
moto/emr/__init__.py
Normal file
2
moto/emr/__init__.py
Normal file
@ -0,0 +1,2 @@
|
|||||||
|
from .models import emr_backend
|
||||||
|
mock_emr = emr_backend.decorator
|
167
moto/emr/models.py
Normal file
167
moto/emr/models.py
Normal file
@ -0,0 +1,167 @@
|
|||||||
|
from moto.core import BaseBackend
|
||||||
|
|
||||||
|
from .utils import random_job_id, random_instance_group_id
|
||||||
|
|
||||||
|
|
||||||
|
class FakeInstanceGroup(object):
|
||||||
|
def __init__(self, id, instance_count, instance_role, instance_type, market, name, bid_price=None):
|
||||||
|
self.id = id
|
||||||
|
self.num_instances = instance_count
|
||||||
|
self.role = instance_role
|
||||||
|
self.type = instance_type
|
||||||
|
self.market = market
|
||||||
|
self.name = name
|
||||||
|
self.bid_price = bid_price
|
||||||
|
|
||||||
|
def set_instance_count(self, instance_count):
|
||||||
|
self.num_instances = instance_count
|
||||||
|
|
||||||
|
|
||||||
|
class FakeStep(object):
|
||||||
|
def __init__(self, state, **kwargs):
|
||||||
|
# 'Steps.member.1.HadoopJarStep.Jar': ['/home/hadoop/contrib/streaming/hadoop-streaming.jar'],
|
||||||
|
# 'Steps.member.1.HadoopJarStep.Args.member.1': ['-mapper'],
|
||||||
|
# 'Steps.member.1.HadoopJarStep.Args.member.2': ['s3n://elasticmapreduce/samples/wordcount/wordSplitter.py'],
|
||||||
|
# 'Steps.member.1.HadoopJarStep.Args.member.3': ['-reducer'],
|
||||||
|
# 'Steps.member.1.HadoopJarStep.Args.member.4': ['aggregate'],
|
||||||
|
# 'Steps.member.1.HadoopJarStep.Args.member.5': ['-input'],
|
||||||
|
# 'Steps.member.1.HadoopJarStep.Args.member.6': ['s3n://elasticmapreduce/samples/wordcount/input'],
|
||||||
|
# 'Steps.member.1.HadoopJarStep.Args.member.7': ['-output'],
|
||||||
|
# 'Steps.member.1.HadoopJarStep.Args.member.8': ['s3n://<my output bucket>/output/wordcount_output'],
|
||||||
|
# 'Steps.member.1.ActionOnFailure': ['TERMINATE_JOB_FLOW'],
|
||||||
|
# 'Steps.member.1.Name': ['My wordcount example']}
|
||||||
|
|
||||||
|
self.action_on_failure = kwargs['action_on_failure']
|
||||||
|
self.name = kwargs['name']
|
||||||
|
self.jar = kwargs['hadoop_jar_step._jar']
|
||||||
|
self.args = []
|
||||||
|
self.state = state
|
||||||
|
|
||||||
|
arg_index = 1
|
||||||
|
while True:
|
||||||
|
arg = kwargs.get('hadoop_jar_step._args.member.{}'.format(arg_index))
|
||||||
|
if arg:
|
||||||
|
self.args.append(arg)
|
||||||
|
arg_index += 1
|
||||||
|
else:
|
||||||
|
break
|
||||||
|
|
||||||
|
|
||||||
|
class FakeJobFlow(object):
|
||||||
|
def __init__(self, job_id, name, log_uri, steps, instance_attrs):
|
||||||
|
self.id = job_id
|
||||||
|
self.name = name
|
||||||
|
self.log_uri = log_uri
|
||||||
|
self.state = "STARTING"
|
||||||
|
self.steps = []
|
||||||
|
self.add_steps(steps)
|
||||||
|
|
||||||
|
self.initial_instance_count = instance_attrs.get('instance_count', 0)
|
||||||
|
self.initial_master_instance_type = instance_attrs.get('master_instance_type')
|
||||||
|
self.initial_slave_instance_type = instance_attrs.get('slave_instance_type')
|
||||||
|
|
||||||
|
self.ec2_key_name = instance_attrs.get('ec2_key_name')
|
||||||
|
self.availability_zone = instance_attrs.get('placement.availability_zone')
|
||||||
|
self.keep_job_flow_alive_when_no_steps = instance_attrs.get('keep_job_flow_alive_when_no_steps')
|
||||||
|
self.termination_protected = instance_attrs.get('termination_protected')
|
||||||
|
|
||||||
|
self.instance_group_ids = []
|
||||||
|
|
||||||
|
def terminate(self):
|
||||||
|
self.state = 'TERMINATED'
|
||||||
|
|
||||||
|
def add_steps(self, steps):
|
||||||
|
for index, step in enumerate(steps):
|
||||||
|
if self.steps:
|
||||||
|
# If we already have other steps, this one is pending
|
||||||
|
self.steps.append(FakeStep(state='PENDING', **step))
|
||||||
|
else:
|
||||||
|
self.steps.append(FakeStep(state='STARTING', **step))
|
||||||
|
|
||||||
|
def add_instance_group(self, instance_group_id):
|
||||||
|
self.instance_group_ids.append(instance_group_id)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def instance_groups(self):
|
||||||
|
return emr_backend.get_instance_groups(self.instance_group_ids)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def master_instance_type(self):
|
||||||
|
groups = self.instance_groups
|
||||||
|
if groups:
|
||||||
|
groups[0].type
|
||||||
|
else:
|
||||||
|
return self.initial_master_instance_type
|
||||||
|
|
||||||
|
@property
|
||||||
|
def slave_instance_type(self):
|
||||||
|
groups = self.instance_groups
|
||||||
|
if groups:
|
||||||
|
groups[0].type
|
||||||
|
else:
|
||||||
|
return self.initial_slave_instance_type
|
||||||
|
|
||||||
|
@property
|
||||||
|
def instance_count(self):
|
||||||
|
groups = self.instance_groups
|
||||||
|
if not groups:
|
||||||
|
# No groups,return initial instance count
|
||||||
|
return self.initial_instance_count
|
||||||
|
count = 0
|
||||||
|
for group in groups:
|
||||||
|
count += int(group.num_instances)
|
||||||
|
return count
|
||||||
|
|
||||||
|
|
||||||
|
class ElasticMapReduceBackend(BaseBackend):
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.job_flows = {}
|
||||||
|
self.instance_groups = {}
|
||||||
|
|
||||||
|
def run_job_flow(self, name, log_uri, steps, instance_attrs):
|
||||||
|
job_id = random_job_id()
|
||||||
|
job_flow = FakeJobFlow(job_id, name, log_uri, steps, instance_attrs)
|
||||||
|
self.job_flows[job_id] = job_flow
|
||||||
|
return job_flow
|
||||||
|
|
||||||
|
def add_job_flow_steps(self, job_flow_id, steps):
|
||||||
|
job_flow = self.job_flows[job_flow_id]
|
||||||
|
job_flow.add_steps(steps)
|
||||||
|
return job_flow
|
||||||
|
|
||||||
|
def describe_job_flows(self):
|
||||||
|
return self.job_flows.values()
|
||||||
|
|
||||||
|
def terminate_job_flows(self, job_ids):
|
||||||
|
flows = [flow for flow in self.describe_job_flows() if flow.id in job_ids]
|
||||||
|
for flow in flows:
|
||||||
|
flow.terminate()
|
||||||
|
return flows
|
||||||
|
|
||||||
|
def get_instance_groups(self, instance_group_ids):
|
||||||
|
return [
|
||||||
|
group for group_id, group
|
||||||
|
in self.instance_groups.items()
|
||||||
|
if group_id in instance_group_ids
|
||||||
|
]
|
||||||
|
|
||||||
|
def add_instance_groups(self, job_flow_id, instance_groups):
|
||||||
|
job_flow = self.job_flows[job_flow_id]
|
||||||
|
result_groups = []
|
||||||
|
for instance_group in instance_groups:
|
||||||
|
instance_group_id = random_instance_group_id()
|
||||||
|
group = FakeInstanceGroup(instance_group_id, **instance_group)
|
||||||
|
self.instance_groups[instance_group_id] = group
|
||||||
|
job_flow.add_instance_group(instance_group_id)
|
||||||
|
result_groups.append(group)
|
||||||
|
return result_groups
|
||||||
|
|
||||||
|
def modify_instance_groups(self, instance_groups):
|
||||||
|
result_groups = []
|
||||||
|
for instance_group in instance_groups:
|
||||||
|
group = self.instance_groups[instance_group['instance_group_id']]
|
||||||
|
group.set_instance_count(instance_group['instance_count'])
|
||||||
|
return result_groups
|
||||||
|
|
||||||
|
emr_backend = ElasticMapReduceBackend()
|
192
moto/emr/responses.py
Normal file
192
moto/emr/responses.py
Normal file
@ -0,0 +1,192 @@
|
|||||||
|
from jinja2 import Template
|
||||||
|
|
||||||
|
from moto.core.responses import BaseResponse
|
||||||
|
from moto.core.utils import camelcase_to_underscores
|
||||||
|
from .models import emr_backend
|
||||||
|
|
||||||
|
|
||||||
|
class ElasticMapReduceResponse(BaseResponse):
|
||||||
|
|
||||||
|
def _get_param(self, param_name):
|
||||||
|
return self.querystring.get(param_name, [None])[0]
|
||||||
|
|
||||||
|
def _get_multi_param(self, param_prefix):
|
||||||
|
return [value[0] for key, value in self.querystring.items() if key.startswith(param_prefix)]
|
||||||
|
|
||||||
|
def _get_dict_param(self, param_prefix):
|
||||||
|
return {
|
||||||
|
camelcase_to_underscores(key.replace(param_prefix, "")): value[0]
|
||||||
|
for key, value
|
||||||
|
in self.querystring.items()
|
||||||
|
if key.startswith(param_prefix)
|
||||||
|
}
|
||||||
|
|
||||||
|
def _get_list_prefix(self, param_prefix):
|
||||||
|
results = []
|
||||||
|
param_index = 1
|
||||||
|
while True:
|
||||||
|
index_prefix = "{}.{}.".format(param_prefix, param_index)
|
||||||
|
new_items = {
|
||||||
|
camelcase_to_underscores(key.replace(index_prefix, "")): value[0]
|
||||||
|
for key, value in self.querystring.items()
|
||||||
|
if key.startswith(index_prefix)
|
||||||
|
}
|
||||||
|
if not new_items:
|
||||||
|
break
|
||||||
|
results.append(new_items)
|
||||||
|
param_index += 1
|
||||||
|
return results
|
||||||
|
|
||||||
|
def add_job_flow_steps(self):
|
||||||
|
job_flow_id = self._get_param('JobFlowId')
|
||||||
|
steps = self._get_list_prefix('Steps.member')
|
||||||
|
|
||||||
|
job_flow = emr_backend.add_job_flow_steps(job_flow_id, steps)
|
||||||
|
template = Template(ADD_JOB_FLOW_STEPS_TEMPLATE)
|
||||||
|
return template.render(job_flow=job_flow)
|
||||||
|
|
||||||
|
def run_job_flow(self):
|
||||||
|
flow_name = self._get_param('Name')
|
||||||
|
log_uri = self._get_param('LogUri')
|
||||||
|
steps = self._get_list_prefix('Steps.member')
|
||||||
|
instance_attrs = self._get_dict_param('Instances.')
|
||||||
|
|
||||||
|
job_flow = emr_backend.run_job_flow(flow_name, log_uri, steps, instance_attrs)
|
||||||
|
template = Template(RUN_JOB_FLOW_TEMPLATE)
|
||||||
|
return template.render(job_flow=job_flow)
|
||||||
|
|
||||||
|
def describe_job_flows(self):
|
||||||
|
job_flows = emr_backend.describe_job_flows()
|
||||||
|
template = Template(DESCRIBE_JOB_FLOWS_TEMPLATE)
|
||||||
|
return template.render(job_flows=job_flows)
|
||||||
|
|
||||||
|
def terminate_job_flows(self):
|
||||||
|
job_ids = self._get_multi_param('JobFlowIds.member.')
|
||||||
|
job_flows = emr_backend.terminate_job_flows(job_ids)
|
||||||
|
template = Template(TERMINATE_JOB_FLOWS_TEMPLATE)
|
||||||
|
return template.render(job_flows=job_flows)
|
||||||
|
|
||||||
|
def add_instance_groups(self):
|
||||||
|
jobflow_id = self._get_param('JobFlowId')
|
||||||
|
instance_groups = self._get_list_prefix('InstanceGroups.member')
|
||||||
|
instance_groups = emr_backend.add_instance_groups(jobflow_id, instance_groups)
|
||||||
|
template = Template(ADD_INSTANCE_GROUPS_TEMPLATE)
|
||||||
|
return template.render(instance_groups=instance_groups)
|
||||||
|
|
||||||
|
def modify_instance_groups(self):
|
||||||
|
instance_groups = self._get_list_prefix('InstanceGroups.member')
|
||||||
|
instance_groups = emr_backend.modify_instance_groups(instance_groups)
|
||||||
|
template = Template(MODIFY_INSTANCE_GROUPS_TEMPLATE)
|
||||||
|
return template.render(instance_groups=instance_groups)
|
||||||
|
|
||||||
|
|
||||||
|
RUN_JOB_FLOW_TEMPLATE = """<RunJobFlowResponse xmlns="http://elasticmapreduce.amazonaws.com/doc/2009-03-31">
|
||||||
|
<RunJobFlowResult>
|
||||||
|
<JobFlowId>{{ job_flow.id }}</JobFlowId>
|
||||||
|
</RunJobFlowResult>
|
||||||
|
<ResponseMetadata>
|
||||||
|
<RequestId>
|
||||||
|
8296d8b8-ed85-11dd-9877-6fad448a8419
|
||||||
|
</RequestId>
|
||||||
|
</ResponseMetadata>
|
||||||
|
</RunJobFlowResponse>"""
|
||||||
|
|
||||||
|
DESCRIBE_JOB_FLOWS_TEMPLATE = """<DescribeJobFlowsResponse xmlns="http://elasticmapreduce.amazonaws.com/doc/2009-03-31">
|
||||||
|
<DescribeJobFlowsResult>
|
||||||
|
<JobFlows>
|
||||||
|
{% for job_flow in job_flows %}
|
||||||
|
<member>
|
||||||
|
<ExecutionStatusDetail>
|
||||||
|
<CreationDateTime>2009-01-28T21:49:16Z</CreationDateTime>
|
||||||
|
<StartDateTime>2009-01-28T21:49:16Z</StartDateTime>
|
||||||
|
<State>{{ job_flow.state }}</State>
|
||||||
|
</ExecutionStatusDetail>
|
||||||
|
<Name>{{ job_flow.name }}</Name>
|
||||||
|
<LogUri>{{ job_flow.log_uri }}</LogUri>
|
||||||
|
<Steps>
|
||||||
|
{% for step in job_flow.steps %}
|
||||||
|
<member>
|
||||||
|
<ExecutionStatusDetail>
|
||||||
|
<CreationDateTime>2009-01-28T21:49:16Z</CreationDateTime>
|
||||||
|
<State>{{ step.state }}</State>
|
||||||
|
</ExecutionStatusDetail>
|
||||||
|
<StepConfig>
|
||||||
|
<HadoopJarStep>
|
||||||
|
<Jar>{{ step.jar }}</Jar>
|
||||||
|
<MainClass>MyMainClass</MainClass>
|
||||||
|
<Args>
|
||||||
|
{% for arg in step.args %}
|
||||||
|
<member>{{ arg }}</member>
|
||||||
|
{% endfor %}
|
||||||
|
</Args>
|
||||||
|
<Properties/>
|
||||||
|
</HadoopJarStep>
|
||||||
|
<Name>{{ step.name }}</Name>
|
||||||
|
<ActionOnFailure>CONTINUE</ActionOnFailure>
|
||||||
|
</StepConfig>
|
||||||
|
</member>
|
||||||
|
{% endfor %}
|
||||||
|
</Steps>
|
||||||
|
<JobFlowId>{{ job_flow.id }}</JobFlowId>
|
||||||
|
<Instances>
|
||||||
|
<Placement>
|
||||||
|
<AvailabilityZone>us-east-1a</AvailabilityZone>
|
||||||
|
</Placement>
|
||||||
|
<SlaveInstanceType>{{ job_flow.slave_instance_type }}</SlaveInstanceType>
|
||||||
|
<MasterInstanceType>{{ job_flow.master_instance_type }}</MasterInstanceType>
|
||||||
|
<Ec2KeyName>{{ job_flow.ec2_key_name }}</Ec2KeyName>
|
||||||
|
<InstanceCount>{{ job_flow.instance_count }}</InstanceCount>
|
||||||
|
<KeepJobFlowAliveWhenNoSteps>{{ job_flow.keep_job_flow_alive_when_no_steps }}</KeepJobFlowAliveWhenNoSteps>
|
||||||
|
<TerminationProtected>{{ job_flow.termination_protected }}</TerminationProtected>
|
||||||
|
<InstanceGroups>
|
||||||
|
{% for instance_group in job_flow.instance_groups %}
|
||||||
|
<member>
|
||||||
|
<InstanceGroupId>{{ instance_group.id }}</InstanceGroupId>
|
||||||
|
<InstanceRole>{{ instance_group.role }}</InstanceRole>
|
||||||
|
<InstanceRunningCount>{{ instance_group.num_instances }}</InstanceRunningCount>
|
||||||
|
<InstanceType>{{ instance_group.type }}</InstanceType>
|
||||||
|
<Market>{{ instance_group.market }}</Market>
|
||||||
|
<Name>{{ instance_group.name }}</Name>
|
||||||
|
<BidPrice>{{ instance_group.bid_price }}</BidPrice>
|
||||||
|
</member>
|
||||||
|
{% endfor %}
|
||||||
|
</InstanceGroups>
|
||||||
|
</Instances>
|
||||||
|
</member>
|
||||||
|
{% endfor %}
|
||||||
|
</JobFlows>
|
||||||
|
</DescribeJobFlowsResult>
|
||||||
|
<ResponseMetadata>
|
||||||
|
<RequestId>
|
||||||
|
9cea3229-ed85-11dd-9877-6fad448a8419
|
||||||
|
</RequestId>
|
||||||
|
</ResponseMetadata>
|
||||||
|
</DescribeJobFlowsResponse>"""
|
||||||
|
|
||||||
|
TERMINATE_JOB_FLOWS_TEMPLATE = """<TerminateJobFlowsResponse xmlns="http://elasticmapreduce.amazonaws.com/doc/2009-03-31">
|
||||||
|
<ResponseMetadata>
|
||||||
|
<RequestId>
|
||||||
|
2690d7eb-ed86-11dd-9877-6fad448a8419
|
||||||
|
</RequestId>
|
||||||
|
</ResponseMetadata>
|
||||||
|
</TerminateJobFlowsResponse>"""
|
||||||
|
|
||||||
|
ADD_JOB_FLOW_STEPS_TEMPLATE = """<AddJobFlowStepsResponse xmlns="http://elasticmapreduce.amazonaws.com/doc/2009-03-31">
|
||||||
|
<ResponseMetadata>
|
||||||
|
<RequestId>
|
||||||
|
df6f4f4a-ed85-11dd-9877-6fad448a8419
|
||||||
|
</RequestId>
|
||||||
|
</ResponseMetadata>
|
||||||
|
</AddJobFlowStepsResponse>"""
|
||||||
|
|
||||||
|
ADD_INSTANCE_GROUPS_TEMPLATE = """<AddInstanceGroupsResponse xmlns="http://elasticmapreduce.amazonaws.com/doc/2009-03-31">
|
||||||
|
<InstanceGroupIds>{% for instance_group in instance_groups %}{{ instance_group.id }}{% if loop.index != loop.length %},{% endif %}{% endfor %}</InstanceGroupIds>
|
||||||
|
</AddInstanceGroupsResponse>"""
|
||||||
|
|
||||||
|
MODIFY_INSTANCE_GROUPS_TEMPLATE = """<ModifyInstanceGroupsResponse xmlns="http://elasticmapreduce.amazonaws.com/doc/2009-03-31">
|
||||||
|
<ResponseMetadata>
|
||||||
|
<RequestId>
|
||||||
|
2690d7eb-ed86-11dd-9877-6fad448a8419
|
||||||
|
</RequestId>
|
||||||
|
</ResponseMetadata>
|
||||||
|
</ModifyInstanceGroupsResponse>"""
|
9
moto/emr/urls.py
Normal file
9
moto/emr/urls.py
Normal file
@ -0,0 +1,9 @@
|
|||||||
|
from .responses import ElasticMapReduceResponse
|
||||||
|
|
||||||
|
url_bases = [
|
||||||
|
"https?://elasticmapreduce.(.+).amazonaws.com",
|
||||||
|
]
|
||||||
|
|
||||||
|
url_paths = {
|
||||||
|
'{0}/$': ElasticMapReduceResponse().dispatch,
|
||||||
|
}
|
14
moto/emr/utils.py
Normal file
14
moto/emr/utils.py
Normal file
@ -0,0 +1,14 @@
|
|||||||
|
import random
|
||||||
|
import string
|
||||||
|
|
||||||
|
|
||||||
|
def random_job_id(size=13):
|
||||||
|
chars = range(10) + list(string.uppercase)
|
||||||
|
job_tag = ''.join(unicode(random.choice(chars)) for x in range(size))
|
||||||
|
return 'j-{}'.format(job_tag)
|
||||||
|
|
||||||
|
|
||||||
|
def random_instance_group_id(size=13):
|
||||||
|
chars = range(10) + list(string.uppercase)
|
||||||
|
job_tag = ''.join(unicode(random.choice(chars)) for x in range(size))
|
||||||
|
return 'i-{}'.format(job_tag)
|
233
tests/test_emr/test_emr.py
Normal file
233
tests/test_emr/test_emr.py
Normal file
@ -0,0 +1,233 @@
|
|||||||
|
import boto
|
||||||
|
from boto.emr.instance_group import InstanceGroup
|
||||||
|
from boto.emr.step import StreamingStep
|
||||||
|
import sure # noqa
|
||||||
|
|
||||||
|
from moto import mock_emr
|
||||||
|
|
||||||
|
|
||||||
|
@mock_emr
|
||||||
|
def test_create_job_flow():
|
||||||
|
conn = boto.connect_emr()
|
||||||
|
|
||||||
|
step1 = StreamingStep(
|
||||||
|
name='My wordcount example',
|
||||||
|
mapper='s3n://elasticmapreduce/samples/wordcount/wordSplitter.py',
|
||||||
|
reducer='aggregate',
|
||||||
|
input='s3n://elasticmapreduce/samples/wordcount/input',
|
||||||
|
output='s3n://output_bucket/output/wordcount_output'
|
||||||
|
)
|
||||||
|
|
||||||
|
step2 = StreamingStep(
|
||||||
|
name='My wordcount example2',
|
||||||
|
mapper='s3n://elasticmapreduce/samples/wordcount/wordSplitter2.py',
|
||||||
|
reducer='aggregate',
|
||||||
|
input='s3n://elasticmapreduce/samples/wordcount/input2',
|
||||||
|
output='s3n://output_bucket/output/wordcount_output2'
|
||||||
|
)
|
||||||
|
|
||||||
|
job_id = conn.run_jobflow(
|
||||||
|
name='My jobflow',
|
||||||
|
log_uri='s3://some_bucket/jobflow_logs',
|
||||||
|
master_instance_type='m1.medium',
|
||||||
|
slave_instance_type='m1.small',
|
||||||
|
steps=[step1, step2]
|
||||||
|
)
|
||||||
|
|
||||||
|
job_flow = conn.describe_jobflow(job_id)
|
||||||
|
job_flow.state.should.equal('STARTING')
|
||||||
|
job_flow.jobflowid.should.equal(job_id)
|
||||||
|
job_flow.name.should.equal('My jobflow')
|
||||||
|
job_flow.masterinstancetype.should.equal('m1.medium')
|
||||||
|
job_flow.slaveinstancetype.should.equal('m1.small')
|
||||||
|
job_flow.loguri.should.equal('s3://some_bucket/jobflow_logs')
|
||||||
|
job_step = job_flow.steps[0]
|
||||||
|
job_step.name.should.equal('My wordcount example')
|
||||||
|
job_step.state.should.equal('STARTING')
|
||||||
|
args = [arg.value for arg in job_step.args]
|
||||||
|
args.should.equal([
|
||||||
|
'-mapper',
|
||||||
|
's3n://elasticmapreduce/samples/wordcount/wordSplitter.py',
|
||||||
|
'-reducer',
|
||||||
|
'aggregate',
|
||||||
|
'-input',
|
||||||
|
's3n://elasticmapreduce/samples/wordcount/input',
|
||||||
|
'-output',
|
||||||
|
's3n://output_bucket/output/wordcount_output',
|
||||||
|
])
|
||||||
|
|
||||||
|
job_step2 = job_flow.steps[1]
|
||||||
|
job_step2.name.should.equal('My wordcount example2')
|
||||||
|
job_step2.state.should.equal('PENDING')
|
||||||
|
args = [arg.value for arg in job_step2.args]
|
||||||
|
args.should.equal([
|
||||||
|
'-mapper',
|
||||||
|
's3n://elasticmapreduce/samples/wordcount/wordSplitter2.py',
|
||||||
|
'-reducer',
|
||||||
|
'aggregate',
|
||||||
|
'-input',
|
||||||
|
's3n://elasticmapreduce/samples/wordcount/input2',
|
||||||
|
'-output',
|
||||||
|
's3n://output_bucket/output/wordcount_output2',
|
||||||
|
])
|
||||||
|
|
||||||
|
|
||||||
|
@mock_emr
|
||||||
|
def test_terminate_job_flow():
|
||||||
|
conn = boto.connect_emr()
|
||||||
|
job_id = conn.run_jobflow(
|
||||||
|
name='My jobflow',
|
||||||
|
log_uri='s3://some_bucket/jobflow_logs',
|
||||||
|
steps=[]
|
||||||
|
)
|
||||||
|
|
||||||
|
flow = conn.describe_jobflows()[0]
|
||||||
|
flow.state.should.equal('STARTING')
|
||||||
|
conn.terminate_jobflow(job_id)
|
||||||
|
flow = conn.describe_jobflows()[0]
|
||||||
|
flow.state.should.equal('TERMINATED')
|
||||||
|
|
||||||
|
|
||||||
|
@mock_emr
|
||||||
|
def test_add_steps_to_flow():
|
||||||
|
conn = boto.connect_emr()
|
||||||
|
|
||||||
|
step1 = StreamingStep(
|
||||||
|
name='My wordcount example',
|
||||||
|
mapper='s3n://elasticmapreduce/samples/wordcount/wordSplitter.py',
|
||||||
|
reducer='aggregate',
|
||||||
|
input='s3n://elasticmapreduce/samples/wordcount/input',
|
||||||
|
output='s3n://output_bucket/output/wordcount_output'
|
||||||
|
)
|
||||||
|
|
||||||
|
job_id = conn.run_jobflow(
|
||||||
|
name='My jobflow',
|
||||||
|
log_uri='s3://some_bucket/jobflow_logs',
|
||||||
|
steps=[step1]
|
||||||
|
)
|
||||||
|
|
||||||
|
job_flow = conn.describe_jobflow(job_id)
|
||||||
|
job_flow.state.should.equal('STARTING')
|
||||||
|
job_flow.jobflowid.should.equal(job_id)
|
||||||
|
job_flow.name.should.equal('My jobflow')
|
||||||
|
job_flow.loguri.should.equal('s3://some_bucket/jobflow_logs')
|
||||||
|
|
||||||
|
step2 = StreamingStep(
|
||||||
|
name='My wordcount example2',
|
||||||
|
mapper='s3n://elasticmapreduce/samples/wordcount/wordSplitter2.py',
|
||||||
|
reducer='aggregate',
|
||||||
|
input='s3n://elasticmapreduce/samples/wordcount/input2',
|
||||||
|
output='s3n://output_bucket/output/wordcount_output2'
|
||||||
|
)
|
||||||
|
|
||||||
|
conn.add_jobflow_steps(job_id, [step2])
|
||||||
|
|
||||||
|
job_flow = conn.describe_jobflow(job_id)
|
||||||
|
job_step = job_flow.steps[0]
|
||||||
|
job_step.name.should.equal('My wordcount example')
|
||||||
|
job_step.state.should.equal('STARTING')
|
||||||
|
args = [arg.value for arg in job_step.args]
|
||||||
|
args.should.equal([
|
||||||
|
'-mapper',
|
||||||
|
's3n://elasticmapreduce/samples/wordcount/wordSplitter.py',
|
||||||
|
'-reducer',
|
||||||
|
'aggregate',
|
||||||
|
'-input',
|
||||||
|
's3n://elasticmapreduce/samples/wordcount/input',
|
||||||
|
'-output',
|
||||||
|
's3n://output_bucket/output/wordcount_output',
|
||||||
|
])
|
||||||
|
|
||||||
|
job_step2 = job_flow.steps[1]
|
||||||
|
job_step2.name.should.equal('My wordcount example2')
|
||||||
|
job_step2.state.should.equal('PENDING')
|
||||||
|
args = [arg.value for arg in job_step2.args]
|
||||||
|
args.should.equal([
|
||||||
|
'-mapper',
|
||||||
|
's3n://elasticmapreduce/samples/wordcount/wordSplitter2.py',
|
||||||
|
'-reducer',
|
||||||
|
'aggregate',
|
||||||
|
'-input',
|
||||||
|
's3n://elasticmapreduce/samples/wordcount/input2',
|
||||||
|
'-output',
|
||||||
|
's3n://output_bucket/output/wordcount_output2',
|
||||||
|
])
|
||||||
|
|
||||||
|
|
||||||
|
@mock_emr
|
||||||
|
def test_create_instance_groups():
|
||||||
|
conn = boto.connect_emr()
|
||||||
|
|
||||||
|
step1 = StreamingStep(
|
||||||
|
name='My wordcount example',
|
||||||
|
mapper='s3n://elasticmapreduce/samples/wordcount/wordSplitter.py',
|
||||||
|
reducer='aggregate',
|
||||||
|
input='s3n://elasticmapreduce/samples/wordcount/input',
|
||||||
|
output='s3n://output_bucket/output/wordcount_output'
|
||||||
|
)
|
||||||
|
|
||||||
|
job_id = conn.run_jobflow(
|
||||||
|
name='My jobflow',
|
||||||
|
log_uri='s3://some_bucket/jobflow_logs',
|
||||||
|
steps=[step1]
|
||||||
|
)
|
||||||
|
|
||||||
|
instance_group = InstanceGroup(6, 'TASK', 'c1.medium', 'SPOT', 'spot-0.07', '0.07')
|
||||||
|
instance_group = conn.add_instance_groups(job_id, [instance_group])
|
||||||
|
instance_group_id = instance_group.instancegroupids
|
||||||
|
job_flow = conn.describe_jobflows()[0]
|
||||||
|
int(job_flow.instancecount).should.equal(6)
|
||||||
|
instance_group = job_flow.instancegroups[0]
|
||||||
|
instance_group.instancegroupid.should.equal(instance_group_id)
|
||||||
|
int(instance_group.instancerunningcount).should.equal(6)
|
||||||
|
instance_group.instancerole.should.equal('TASK')
|
||||||
|
instance_group.instancetype.should.equal('c1.medium')
|
||||||
|
instance_group.market.should.equal('SPOT')
|
||||||
|
instance_group.name.should.equal('spot-0.07')
|
||||||
|
instance_group.bidprice.should.equal('0.07')
|
||||||
|
|
||||||
|
|
||||||
|
@mock_emr
|
||||||
|
def test_modify_instance_groups():
|
||||||
|
conn = boto.connect_emr()
|
||||||
|
|
||||||
|
step1 = StreamingStep(
|
||||||
|
name='My wordcount example',
|
||||||
|
mapper='s3n://elasticmapreduce/samples/wordcount/wordSplitter.py',
|
||||||
|
reducer='aggregate',
|
||||||
|
input='s3n://elasticmapreduce/samples/wordcount/input',
|
||||||
|
output='s3n://output_bucket/output/wordcount_output'
|
||||||
|
)
|
||||||
|
|
||||||
|
job_id = conn.run_jobflow(
|
||||||
|
name='My jobflow',
|
||||||
|
log_uri='s3://some_bucket/jobflow_logs',
|
||||||
|
steps=[step1]
|
||||||
|
)
|
||||||
|
|
||||||
|
instance_group1 = InstanceGroup(6, 'TASK', 'c1.medium', 'SPOT', 'spot-0.07', '0.07')
|
||||||
|
instance_group2 = InstanceGroup(6, 'TASK', 'c1.medium', 'SPOT', 'spot-0.07', '0.07')
|
||||||
|
instance_group = conn.add_instance_groups(job_id, [instance_group1, instance_group2])
|
||||||
|
instance_group_ids = instance_group.instancegroupids.split(",")
|
||||||
|
|
||||||
|
job_flow = conn.describe_jobflows()[0]
|
||||||
|
int(job_flow.instancecount).should.equal(12)
|
||||||
|
instance_group = job_flow.instancegroups[0]
|
||||||
|
int(instance_group.instancerunningcount).should.equal(6)
|
||||||
|
|
||||||
|
conn.modify_instance_groups(instance_group_ids, [2, 3])
|
||||||
|
|
||||||
|
job_flow = conn.describe_jobflows()[0]
|
||||||
|
int(job_flow.instancecount).should.equal(5)
|
||||||
|
instance_group1 = [
|
||||||
|
group for group
|
||||||
|
in job_flow.instancegroups
|
||||||
|
if group.instancegroupid == instance_group_ids[0]
|
||||||
|
][0]
|
||||||
|
int(instance_group1.instancerunningcount).should.equal(2)
|
||||||
|
instance_group2 = [
|
||||||
|
group for group
|
||||||
|
in job_flow.instancegroups
|
||||||
|
if group.instancegroupid == instance_group_ids[1]
|
||||||
|
][0]
|
||||||
|
int(instance_group2.instancerunningcount).should.equal(3)
|
16
tests/test_emr/test_server.py
Normal file
16
tests/test_emr/test_server.py
Normal file
@ -0,0 +1,16 @@
|
|||||||
|
import sure # noqa
|
||||||
|
|
||||||
|
import moto.server as server
|
||||||
|
|
||||||
|
'''
|
||||||
|
Test the different server responses
|
||||||
|
'''
|
||||||
|
server.configure_urls("emr")
|
||||||
|
|
||||||
|
|
||||||
|
def test_describe_jobflows():
|
||||||
|
test_client = server.app.test_client()
|
||||||
|
res = test_client.get('/?Action=DescribeJobFlows')
|
||||||
|
|
||||||
|
res.data.should.contain('<DescribeJobFlowsResult>')
|
||||||
|
res.data.should.contain('<JobFlows>')
|
Loading…
x
Reference in New Issue
Block a user