from __future__ import unicode_literals import boto from boto.emr.bootstrap_action import BootstrapAction from boto.emr.instance_group import InstanceGroup from boto.emr.step import StreamingStep import six import sure # noqa from moto import mock_emr from tests.helpers import requires_boto_gte run_jobflow_args = dict( job_flow_role='EMR_EC2_DefaultRole', keep_alive=True, log_uri='s3://some_bucket/jobflow_logs', master_instance_type='c1.medium', name='My jobflow', num_instances=2, service_role='EMR_DefaultRole', slave_instance_type='c1.medium', ) input_instance_groups = [ InstanceGroup(1, 'MASTER', 'c1.medium', 'ON_DEMAND', 'master'), InstanceGroup(3, 'CORE', 'c1.medium', 'ON_DEMAND', 'core'), InstanceGroup(6, 'TASK', 'c1.large', 'SPOT', 'task-1', '0.07'), InstanceGroup(10, 'TASK', 'c1.xlarge', 'SPOT', 'task-2', '0.05'), ] @mock_emr def test_describe_cluster(): conn = boto.connect_emr() args = run_jobflow_args.copy() args.update(dict( api_params={ 'Applications.member.1.Name': 'Spark', 'Applications.member.1.Version': '2.4.2', 'Configurations.member.1.Classification': 'yarn-site', 'Configurations.member.1.Properties.entry.1.key': 'someproperty', 'Configurations.member.1.Properties.entry.1.value': 'somevalue', 'Instances.EmrManagedMasterSecurityGroup': 'master-security-group', 'Instances.Ec2SubnetId': 'subnet-8be41cec', }, availability_zone='us-east-2b', ec2_keyname='mykey', job_flow_role='EMR_EC2_DefaultRole', keep_alive=False, log_uri='s3://some_bucket/jobflow_logs', name='My jobflow', service_role='EMR_DefaultRole', visible_to_all_users=True, )) cluster_id = conn.run_jobflow(**args) input_tags = {'tag1': 'val1', 'tag2': 'val2'} conn.add_tags(cluster_id, input_tags) cluster = conn.describe_cluster(cluster_id) cluster.applications[0].name.should.equal('Spark') cluster.applications[0].version.should.equal('2.4.2') cluster.autoterminate.should.equal('true') # configurations appear not be supplied as attributes? attrs = cluster.ec2instanceattributes # AdditionalMasterSecurityGroups # AdditionalSlaveSecurityGroups attrs.ec2availabilityzone.should.equal(args['availability_zone']) attrs.ec2keyname.should.equal(args['ec2_keyname']) attrs.ec2subnetid.should.equal(args['api_params']['Instances.Ec2SubnetId']) # EmrManagedMasterSecurityGroups # EmrManagedSlaveSecurityGroups attrs.iaminstanceprofile.should.equal(args['job_flow_role']) # ServiceAccessSecurityGroup cluster.id.should.equal(cluster_id) cluster.loguri.should.equal(args['log_uri']) cluster.masterpublicdnsname.should.be.a(six.string_types) cluster.name.should.equal(args['name']) int(cluster.normalizedinstancehours).should.equal(0) # cluster.release_label cluster.shouldnt.have.property('requestedamiversion') cluster.runningamiversion.should.equal('1.0.0') # cluster.securityconfiguration cluster.servicerole.should.equal(args['service_role']) cluster.status.state.should.equal('TERMINATED') cluster.status.statechangereason.message.should.be.a(six.string_types) cluster.status.statechangereason.code.should.be.a(six.string_types) cluster.status.timeline.creationdatetime.should.be.a(six.string_types) # cluster.status.timeline.enddatetime.should.be.a(six.string_types) # cluster.status.timeline.readydatetime.should.be.a(six.string_types) dict((item.key, item.value) for item in cluster.tags).should.equal(input_tags) cluster.terminationprotected.should.equal('false') cluster.visibletoallusers.should.equal('true') @mock_emr def test_describe_jobflows(): conn = boto.connect_emr() job1_id = conn.run_jobflow(**run_jobflow_args) job2_id = conn.run_jobflow(**run_jobflow_args) jobs = conn.describe_jobflows() jobs.should.have.length_of(2) jobs = conn.describe_jobflows(jobflow_ids=[job2_id]) jobs.should.have.length_of(1) jobs[0].jobflowid.should.equal(job2_id) first_job = conn.describe_jobflow(job1_id) first_job.jobflowid.should.equal(job1_id) @mock_emr def test_describe_jobflow(): conn = boto.connect_emr() args = run_jobflow_args.copy() args.update(dict( ami_version='3.8.1', api_params={ #'Applications.member.1.Name': 'Spark', #'Applications.member.1.Version': '2.4.2', #'Configurations.member.1.Classification': 'yarn-site', #'Configurations.member.1.Properties.entry.1.key': 'someproperty', #'Configurations.member.1.Properties.entry.1.value': 'somevalue', #'Instances.EmrManagedMasterSecurityGroup': 'master-security-group', 'Instances.Ec2SubnetId': 'subnet-8be41cec', }, ec2_keyname='mykey', hadoop_version='2.4.0', name='My jobflow', log_uri='s3://some_bucket/jobflow_logs', keep_alive=True, master_instance_type='c1.medium', slave_instance_type='c1.medium', num_instances=2, availability_zone='us-west-2b', job_flow_role='EMR_EC2_DefaultRole', service_role='EMR_DefaultRole', visible_to_all_users=True, )) cluster_id = conn.run_jobflow(**args) jf = conn.describe_jobflow(cluster_id) jf.amiversion.should.equal(args['ami_version']) jf.bootstrapactions.should.equal(None) jf.creationdatetime.should.be.a(six.string_types) jf.should.have.property('laststatechangereason') jf.readydatetime.should.be.a(six.string_types) jf.startdatetime.should.be.a(six.string_types) jf.state.should.equal('WAITING') jf.ec2keyname.should.equal(args['ec2_keyname']) # Ec2SubnetId jf.hadoopversion.should.equal(args['hadoop_version']) int(jf.instancecount).should.equal(2) for ig in jf.instancegroups: ig.creationdatetime.should.be.a(six.string_types) # ig.enddatetime.should.be.a(six.string_types) ig.should.have.property('instancegroupid').being.a(six.string_types) int(ig.instancerequestcount).should.equal(1) ig.instancerole.should.be.within(['MASTER', 'CORE']) int(ig.instancerunningcount).should.equal(1) ig.instancetype.should.equal('c1.medium') ig.laststatechangereason.should.be.a(six.string_types) ig.market.should.equal('ON_DEMAND') ig.name.should.be.a(six.string_types) ig.readydatetime.should.be.a(six.string_types) ig.startdatetime.should.be.a(six.string_types) ig.state.should.equal('RUNNING') jf.keepjobflowalivewhennosteps.should.equal('true') jf.masterinstanceid.should.be.a(six.string_types) jf.masterinstancetype.should.equal(args['master_instance_type']) jf.masterpublicdnsname.should.be.a(six.string_types) int(jf.normalizedinstancehours).should.equal(0) jf.availabilityzone.should.equal(args['availability_zone']) jf.slaveinstancetype.should.equal(args['slave_instance_type']) jf.terminationprotected.should.equal('false') jf.jobflowid.should.equal(cluster_id) # jf.jobflowrole.should.equal(args['job_flow_role']) jf.loguri.should.equal(args['log_uri']) jf.name.should.equal(args['name']) # jf.servicerole.should.equal(args['service_role']) jf.steps.should.have.length_of(0) list(i.value for i in jf.supported_products).should.equal([]) jf.visibletoallusers.should.equal('true') @mock_emr def test_list_clusters(): conn = boto.connect_emr() args = run_jobflow_args.copy() args['name'] = 'jobflow1' cluster1_id = conn.run_jobflow(**args) args['name'] = 'jobflow2' cluster2_id = conn.run_jobflow(**args) conn.terminate_jobflow(cluster2_id) summary = conn.list_clusters() clusters = summary.clusters clusters.should.have.length_of(2) expected = { cluster1_id: { 'id': cluster1_id, 'name': 'jobflow1', 'normalizedinstancehours': 0, 'state': 'WAITING'}, cluster2_id: { 'id': cluster2_id, 'name': 'jobflow2', 'normalizedinstancehours': 0, 'state': 'TERMINATED'}, } for x in clusters: y = expected[x.id] x.id.should.equal(y['id']) x.name.should.equal(y['name']) int(x.normalizedinstancehours).should.equal(y['normalizedinstancehours']) x.status.state.should.equal(y['state']) x.status.timeline.creationdatetime.should.be.a(six.string_types) if y['state'] == 'TERMINATED': x.status.timeline.enddatetime.should.be.a(six.string_types) else: x.status.timeline.shouldnt.have.property('enddatetime') x.status.timeline.readydatetime.should.be.a(six.string_types) @mock_emr def test_run_jobflow(): conn = boto.connect_emr() args = run_jobflow_args.copy() job_id = conn.run_jobflow(**args) job_flow = conn.describe_jobflow(job_id) job_flow.state.should.equal('WAITING') job_flow.jobflowid.should.equal(job_id) job_flow.name.should.equal(args['name']) job_flow.masterinstancetype.should.equal(args['master_instance_type']) job_flow.slaveinstancetype.should.equal(args['slave_instance_type']) job_flow.loguri.should.equal(args['log_uri']) job_flow.visibletoallusers.should.equal('false') int(job_flow.normalizedinstancehours).should.equal(0) job_flow.steps.should.have.length_of(0) @mock_emr def test_run_jobflow_in_multiple_regions(): regions = {} for region in ['us-east-1', 'eu-west-1']: conn = boto.emr.connect_to_region(region) args = run_jobflow_args.copy() args['name'] = region cluster_id = conn.run_jobflow(**args) regions[region] = {'conn': conn, 'cluster_id': cluster_id} for region in regions.keys(): conn = regions[region]['conn'] jf = conn.describe_jobflow(regions[region]['cluster_id']) jf.name.should.equal(region) @requires_boto_gte("2.8") @mock_emr def test_run_jobflow_with_new_params(): # Test that run_jobflow works with newer params conn = boto.connect_emr() conn.run_jobflow(**run_jobflow_args) @requires_boto_gte("2.8") @mock_emr def test_run_jobflow_with_visible_to_all_users(): conn = boto.connect_emr() for expected in (True, False): job_id = conn.run_jobflow( visible_to_all_users=expected, **run_jobflow_args ) job_flow = conn.describe_jobflow(job_id) job_flow.visibletoallusers.should.equal(str(expected).lower()) @requires_boto_gte("2.8") @mock_emr def test_run_jobflow_with_instance_groups(): input_groups = dict((g.name, g) for g in input_instance_groups) conn = boto.connect_emr() job_id = conn.run_jobflow(instance_groups=input_instance_groups, **run_jobflow_args) job_flow = conn.describe_jobflow(job_id) int(job_flow.instancecount).should.equal(sum(g.num_instances for g in input_instance_groups)) for instance_group in job_flow.instancegroups: expected = input_groups[instance_group.name] instance_group.should.have.property('instancegroupid') int(instance_group.instancerunningcount).should.equal(expected.num_instances) instance_group.instancerole.should.equal(expected.role) instance_group.instancetype.should.equal(expected.type) instance_group.market.should.equal(expected.market) if hasattr(expected, 'bidprice'): instance_group.bidprice.should.equal(expected.bidprice) @requires_boto_gte("2.8") @mock_emr def test_set_termination_protection(): conn = boto.connect_emr() job_id = conn.run_jobflow(**run_jobflow_args) job_flow = conn.describe_jobflow(job_id) job_flow.terminationprotected.should.equal('false') conn.set_termination_protection(job_id, True) job_flow = conn.describe_jobflow(job_id) job_flow.terminationprotected.should.equal('true') conn.set_termination_protection(job_id, False) job_flow = conn.describe_jobflow(job_id) job_flow.terminationprotected.should.equal('false') @requires_boto_gte("2.8") @mock_emr def test_set_visible_to_all_users(): conn = boto.connect_emr() args = run_jobflow_args.copy() args['visible_to_all_users'] = False job_id = conn.run_jobflow(**args) job_flow = conn.describe_jobflow(job_id) job_flow.visibletoallusers.should.equal('false') conn.set_visible_to_all_users(job_id, True) job_flow = conn.describe_jobflow(job_id) job_flow.visibletoallusers.should.equal('true') conn.set_visible_to_all_users(job_id, False) job_flow = conn.describe_jobflow(job_id) job_flow.visibletoallusers.should.equal('false') @mock_emr def test_terminate_jobflow(): conn = boto.connect_emr() job_id = conn.run_jobflow(**run_jobflow_args) flow = conn.describe_jobflows()[0] flow.state.should.equal('WAITING') conn.terminate_jobflow(job_id) flow = conn.describe_jobflows()[0] flow.state.should.equal('TERMINATED') # testing multiple end points for each feature @mock_emr def test_bootstrap_actions(): bootstrap_actions = [ BootstrapAction( name='bs1', path='path/to/script', bootstrap_action_args=['arg1', 'arg2']), BootstrapAction( name='bs2', path='path/to/anotherscript', bootstrap_action_args=[]) ] conn = boto.connect_emr() cluster_id = conn.run_jobflow( bootstrap_actions=bootstrap_actions, **run_jobflow_args ) jf = conn.describe_jobflow(cluster_id) for x, y in zip(jf.bootstrapactions, bootstrap_actions): x.name.should.equal(y.name) x.path.should.equal(y.path) list(o.value for o in x.args).should.equal(y.args()) resp = conn.list_bootstrap_actions(cluster_id) for i, y in enumerate(bootstrap_actions): x = resp.actions[i] x.name.should.equal(y.name) x.scriptpath.should.equal(y.path) list(arg.value for arg in x.args).should.equal(y.args()) @mock_emr def test_instance_groups(): input_groups = dict((g.name, g) for g in input_instance_groups) conn = boto.connect_emr() args = run_jobflow_args.copy() for key in ['master_instance_type', 'slave_instance_type', 'num_instances']: del args[key] args['instance_groups'] = input_instance_groups[:2] job_id = conn.run_jobflow(**args) jf = conn.describe_jobflow(job_id) base_instance_count = int(jf.instancecount) conn.add_instance_groups(job_id, input_instance_groups[2:]) jf = conn.describe_jobflow(job_id) int(jf.instancecount).should.equal(sum(g.num_instances for g in input_instance_groups)) for x in jf.instancegroups: y = input_groups[x.name] if hasattr(y, 'bidprice'): x.bidprice.should.equal(y.bidprice) x.creationdatetime.should.be.a(six.string_types) # x.enddatetime.should.be.a(six.string_types) x.should.have.property('instancegroupid') int(x.instancerequestcount).should.equal(y.num_instances) x.instancerole.should.equal(y.role) int(x.instancerunningcount).should.equal(y.num_instances) x.instancetype.should.equal(y.type) x.laststatechangereason.should.be.a(six.string_types) x.market.should.equal(y.market) x.name.should.be.a(six.string_types) x.readydatetime.should.be.a(six.string_types) x.startdatetime.should.be.a(six.string_types) x.state.should.equal('RUNNING') for x in conn.list_instance_groups(job_id).instancegroups: y = input_groups[x.name] if hasattr(y, 'bidprice'): x.bidprice.should.equal(y.bidprice) # Configurations # EbsBlockDevices # EbsOptimized x.should.have.property('id') x.instancegrouptype.should.equal(y.role) x.instancetype.should.equal(y.type) x.market.should.equal(y.market) x.name.should.equal(y.name) int(x.requestedinstancecount).should.equal(y.num_instances) int(x.runninginstancecount).should.equal(y.num_instances) # ShrinkPolicy x.status.state.should.equal('RUNNING') x.status.statechangereason.code.should.be.a(six.string_types) x.status.statechangereason.message.should.be.a(six.string_types) x.status.timeline.creationdatetime.should.be.a(six.string_types) # x.status.timeline.enddatetime.should.be.a(six.string_types) x.status.timeline.readydatetime.should.be.a(six.string_types) igs = dict((g.name, g) for g in jf.instancegroups) conn.modify_instance_groups( [igs['task-1'].instancegroupid, igs['task-2'].instancegroupid], [2, 3]) jf = conn.describe_jobflow(job_id) int(jf.instancecount).should.equal(base_instance_count + 5) igs = dict((g.name, g) for g in jf.instancegroups) int(igs['task-1'].instancerunningcount).should.equal(2) int(igs['task-2'].instancerunningcount).should.equal(3) @mock_emr def test_steps(): input_steps = [ StreamingStep( name='My wordcount example', mapper='s3n://elasticmapreduce/samples/wordcount/wordSplitter.py', reducer='aggregate', input='s3n://elasticmapreduce/samples/wordcount/input', output='s3n://output_bucket/output/wordcount_output'), StreamingStep( name='My wordcount example2', mapper='s3n://elasticmapreduce/samples/wordcount/wordSplitter2.py', reducer='aggregate', input='s3n://elasticmapreduce/samples/wordcount/input2', output='s3n://output_bucket/output/wordcount_output2') ] # TODO: implementation and test for cancel_steps conn = boto.connect_emr() cluster_id = conn.run_jobflow( steps=[input_steps[0]], **run_jobflow_args) jf = conn.describe_jobflow(cluster_id) jf.steps.should.have.length_of(1) conn.add_jobflow_steps(cluster_id, [input_steps[1]]) jf = conn.describe_jobflow(cluster_id) jf.steps.should.have.length_of(2) for step in jf.steps: step.actiononfailure.should.equal('TERMINATE_JOB_FLOW') list(arg.value for arg in step.args).should.have.length_of(8) step.creationdatetime.should.be.a(six.string_types) # step.enddatetime.should.be.a(six.string_types) step.jar.should.equal('/home/hadoop/contrib/streaming/hadoop-streaming.jar') step.laststatechangereason.should.be.a(six.string_types) step.mainclass.should.equal('') step.name.should.be.a(six.string_types) # step.readydatetime.should.be.a(six.string_types) # step.startdatetime.should.be.a(six.string_types) step.state.should.be.within(['STARTING', 'PENDING']) expected = dict((s.name, s) for s in input_steps) for x in conn.list_steps(cluster_id).steps: y = expected[x.name] # actiononfailure list(arg.value for arg in x.config.args).should.equal([ '-mapper', y.mapper, '-reducer', y.reducer, '-input', y.input, '-output', y.output, ]) x.config.jar.should.equal('/home/hadoop/contrib/streaming/hadoop-streaming.jar') x.config.mainclass.should.equal('') # properties x.should.have.property('id').should.be.a(six.string_types) x.name.should.equal(y.name) x.status.state.should.be.within(['STARTING', 'PENDING']) # x.status.statechangereason x.status.timeline.creationdatetime.should.be.a(six.string_types) # x.status.timeline.enddatetime.should.be.a(six.string_types) # x.status.timeline.startdatetime.should.be.a(six.string_types) x = conn.describe_step(cluster_id, x.id) list(arg.value for arg in x.config.args).should.equal([ '-mapper', y.mapper, '-reducer', y.reducer, '-input', y.input, '-output', y.output, ]) x.config.jar.should.equal('/home/hadoop/contrib/streaming/hadoop-streaming.jar') x.config.mainclass.should.equal('') # properties x.should.have.property('id').should.be.a(six.string_types) x.name.should.equal(y.name) x.status.state.should.be.within(['STARTING', 'PENDING']) # x.status.statechangereason x.status.timeline.creationdatetime.should.be.a(six.string_types) # x.status.timeline.enddatetime.should.be.a(six.string_types) # x.status.timeline.startdatetime.should.be.a(six.string_types) @mock_emr def test_tags(): input_tags = {"tag1": "val1", "tag2": "val2"} conn = boto.connect_emr() cluster_id = conn.run_jobflow(**run_jobflow_args) conn.add_tags(cluster_id, input_tags) cluster = conn.describe_cluster(cluster_id) cluster.tags.should.have.length_of(2) dict((t.key, t.value) for t in cluster.tags).should.equal(input_tags) conn.remove_tags(cluster_id, list(input_tags.keys())) cluster = conn.describe_cluster(cluster_id) cluster.tags.should.have.length_of(0)