I had an EMR step that contained a `&` and this caused the ListStep call to fail. I've added the `| escape` filter to handle it in this case and a few other cases that look like they could suffer the same fate.
		
			
				
	
	
		
			659 lines
		
	
	
		
			24 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			659 lines
		
	
	
		
			24 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
from __future__ import unicode_literals
 | 
						|
import time
 | 
						|
from datetime import datetime
 | 
						|
 | 
						|
import boto
 | 
						|
import pytz
 | 
						|
from boto.emr.bootstrap_action import BootstrapAction
 | 
						|
from boto.emr.instance_group import InstanceGroup
 | 
						|
from boto.emr.step import StreamingStep
 | 
						|
 | 
						|
import six
 | 
						|
import sure  # noqa
 | 
						|
 | 
						|
from moto import mock_emr_deprecated
 | 
						|
from tests.helpers import requires_boto_gte
 | 
						|
 | 
						|
 | 
						|
run_jobflow_args = dict(
 | 
						|
    job_flow_role='EMR_EC2_DefaultRole',
 | 
						|
    keep_alive=True,
 | 
						|
    log_uri='s3://some_bucket/jobflow_logs',
 | 
						|
    master_instance_type='c1.medium',
 | 
						|
    name='My jobflow',
 | 
						|
    num_instances=2,
 | 
						|
    service_role='EMR_DefaultRole',
 | 
						|
    slave_instance_type='c1.medium',
 | 
						|
)
 | 
						|
 | 
						|
 | 
						|
input_instance_groups = [
 | 
						|
    InstanceGroup(1, 'MASTER', 'c1.medium', 'ON_DEMAND', 'master'),
 | 
						|
    InstanceGroup(3, 'CORE', 'c1.medium', 'ON_DEMAND', 'core'),
 | 
						|
    InstanceGroup(6, 'TASK', 'c1.large', 'SPOT', 'task-1', '0.07'),
 | 
						|
    InstanceGroup(10, 'TASK', 'c1.xlarge', 'SPOT', 'task-2', '0.05'),
 | 
						|
]
 | 
						|
 | 
						|
 | 
						|
@mock_emr_deprecated
 | 
						|
def test_describe_cluster():
 | 
						|
    conn = boto.connect_emr()
 | 
						|
    args = run_jobflow_args.copy()
 | 
						|
    args.update(dict(
 | 
						|
        api_params={
 | 
						|
            'Applications.member.1.Name': 'Spark',
 | 
						|
            'Applications.member.1.Version': '2.4.2',
 | 
						|
            'Configurations.member.1.Classification': 'yarn-site',
 | 
						|
            'Configurations.member.1.Properties.entry.1.key': 'someproperty',
 | 
						|
            'Configurations.member.1.Properties.entry.1.value': 'somevalue',
 | 
						|
            'Configurations.member.1.Properties.entry.2.key': 'someotherproperty',
 | 
						|
            'Configurations.member.1.Properties.entry.2.value': 'someothervalue',
 | 
						|
            'Instances.EmrManagedMasterSecurityGroup': 'master-security-group',
 | 
						|
            'Instances.Ec2SubnetId': 'subnet-8be41cec',
 | 
						|
        },
 | 
						|
        availability_zone='us-east-2b',
 | 
						|
        ec2_keyname='mykey',
 | 
						|
        job_flow_role='EMR_EC2_DefaultRole',
 | 
						|
        keep_alive=False,
 | 
						|
        log_uri='s3://some_bucket/jobflow_logs',
 | 
						|
        name='My jobflow',
 | 
						|
        service_role='EMR_DefaultRole',
 | 
						|
        visible_to_all_users=True,
 | 
						|
    ))
 | 
						|
    cluster_id = conn.run_jobflow(**args)
 | 
						|
    input_tags = {'tag1': 'val1', 'tag2': 'val2'}
 | 
						|
    conn.add_tags(cluster_id, input_tags)
 | 
						|
 | 
						|
    cluster = conn.describe_cluster(cluster_id)
 | 
						|
    cluster.applications[0].name.should.equal('Spark')
 | 
						|
    cluster.applications[0].version.should.equal('2.4.2')
 | 
						|
    cluster.autoterminate.should.equal('true')
 | 
						|
 | 
						|
    # configurations appear not be supplied as attributes?
 | 
						|
 | 
						|
    attrs = cluster.ec2instanceattributes
 | 
						|
    # AdditionalMasterSecurityGroups
 | 
						|
    # AdditionalSlaveSecurityGroups
 | 
						|
    attrs.ec2availabilityzone.should.equal(args['availability_zone'])
 | 
						|
    attrs.ec2keyname.should.equal(args['ec2_keyname'])
 | 
						|
    attrs.ec2subnetid.should.equal(args['api_params']['Instances.Ec2SubnetId'])
 | 
						|
    # EmrManagedMasterSecurityGroups
 | 
						|
    # EmrManagedSlaveSecurityGroups
 | 
						|
    attrs.iaminstanceprofile.should.equal(args['job_flow_role'])
 | 
						|
    # ServiceAccessSecurityGroup
 | 
						|
 | 
						|
    cluster.id.should.equal(cluster_id)
 | 
						|
    cluster.loguri.should.equal(args['log_uri'])
 | 
						|
    cluster.masterpublicdnsname.should.be.a(six.string_types)
 | 
						|
    cluster.name.should.equal(args['name'])
 | 
						|
    int(cluster.normalizedinstancehours).should.equal(0)
 | 
						|
    # cluster.release_label
 | 
						|
    cluster.shouldnt.have.property('requestedamiversion')
 | 
						|
    cluster.runningamiversion.should.equal('1.0.0')
 | 
						|
    # cluster.securityconfiguration
 | 
						|
    cluster.servicerole.should.equal(args['service_role'])
 | 
						|
 | 
						|
    cluster.status.state.should.equal('TERMINATED')
 | 
						|
    cluster.status.statechangereason.message.should.be.a(six.string_types)
 | 
						|
    cluster.status.statechangereason.code.should.be.a(six.string_types)
 | 
						|
    cluster.status.timeline.creationdatetime.should.be.a(six.string_types)
 | 
						|
    # cluster.status.timeline.enddatetime.should.be.a(six.string_types)
 | 
						|
    # cluster.status.timeline.readydatetime.should.be.a(six.string_types)
 | 
						|
 | 
						|
    dict((item.key, item.value)
 | 
						|
         for item in cluster.tags).should.equal(input_tags)
 | 
						|
 | 
						|
    cluster.terminationprotected.should.equal('false')
 | 
						|
    cluster.visibletoallusers.should.equal('true')
 | 
						|
 | 
						|
 | 
						|
@mock_emr_deprecated
 | 
						|
def test_describe_jobflows():
 | 
						|
    conn = boto.connect_emr()
 | 
						|
    args = run_jobflow_args.copy()
 | 
						|
    expected = {}
 | 
						|
 | 
						|
    for idx in range(4):
 | 
						|
        cluster_name = 'cluster' + str(idx)
 | 
						|
        args['name'] = cluster_name
 | 
						|
        cluster_id = conn.run_jobflow(**args)
 | 
						|
        expected[cluster_id] = {
 | 
						|
            'id': cluster_id,
 | 
						|
            'name': cluster_name,
 | 
						|
            'state': 'WAITING'
 | 
						|
        }
 | 
						|
 | 
						|
    # need sleep since it appears the timestamp is always rounded to
 | 
						|
    # the nearest second internally
 | 
						|
    time.sleep(1)
 | 
						|
    timestamp = datetime.now(pytz.utc)
 | 
						|
    time.sleep(1)
 | 
						|
 | 
						|
    for idx in range(4, 6):
 | 
						|
        cluster_name = 'cluster' + str(idx)
 | 
						|
        args['name'] = cluster_name
 | 
						|
        cluster_id = conn.run_jobflow(**args)
 | 
						|
        conn.terminate_jobflow(cluster_id)
 | 
						|
        expected[cluster_id] = {
 | 
						|
            'id': cluster_id,
 | 
						|
            'name': cluster_name,
 | 
						|
            'state': 'TERMINATED'
 | 
						|
        }
 | 
						|
    jobs = conn.describe_jobflows()
 | 
						|
    jobs.should.have.length_of(6)
 | 
						|
 | 
						|
    for cluster_id, y in expected.items():
 | 
						|
        resp = conn.describe_jobflows(jobflow_ids=[cluster_id])
 | 
						|
        resp.should.have.length_of(1)
 | 
						|
        resp[0].jobflowid.should.equal(cluster_id)
 | 
						|
 | 
						|
    resp = conn.describe_jobflows(states=['WAITING'])
 | 
						|
    resp.should.have.length_of(4)
 | 
						|
    for x in resp:
 | 
						|
        x.state.should.equal('WAITING')
 | 
						|
 | 
						|
    resp = conn.describe_jobflows(created_before=timestamp)
 | 
						|
    resp.should.have.length_of(4)
 | 
						|
 | 
						|
    resp = conn.describe_jobflows(created_after=timestamp)
 | 
						|
    resp.should.have.length_of(2)
 | 
						|
 | 
						|
 | 
						|
@mock_emr_deprecated
 | 
						|
def test_describe_jobflow():
 | 
						|
    conn = boto.connect_emr()
 | 
						|
    args = run_jobflow_args.copy()
 | 
						|
    args.update(dict(
 | 
						|
        ami_version='3.8.1',
 | 
						|
        api_params={
 | 
						|
            #'Applications.member.1.Name': 'Spark',
 | 
						|
            #'Applications.member.1.Version': '2.4.2',
 | 
						|
            #'Configurations.member.1.Classification': 'yarn-site',
 | 
						|
            #'Configurations.member.1.Properties.entry.1.key': 'someproperty',
 | 
						|
            #'Configurations.member.1.Properties.entry.1.value': 'somevalue',
 | 
						|
            #'Instances.EmrManagedMasterSecurityGroup': 'master-security-group',
 | 
						|
            'Instances.Ec2SubnetId': 'subnet-8be41cec',
 | 
						|
        },
 | 
						|
        ec2_keyname='mykey',
 | 
						|
        hadoop_version='2.4.0',
 | 
						|
 | 
						|
        name='My jobflow',
 | 
						|
        log_uri='s3://some_bucket/jobflow_logs',
 | 
						|
        keep_alive=True,
 | 
						|
        master_instance_type='c1.medium',
 | 
						|
        slave_instance_type='c1.medium',
 | 
						|
        num_instances=2,
 | 
						|
 | 
						|
        availability_zone='us-west-2b',
 | 
						|
 | 
						|
        job_flow_role='EMR_EC2_DefaultRole',
 | 
						|
        service_role='EMR_DefaultRole',
 | 
						|
        visible_to_all_users=True,
 | 
						|
    ))
 | 
						|
 | 
						|
    cluster_id = conn.run_jobflow(**args)
 | 
						|
    jf = conn.describe_jobflow(cluster_id)
 | 
						|
    jf.amiversion.should.equal(args['ami_version'])
 | 
						|
    jf.bootstrapactions.should.equal(None)
 | 
						|
    jf.creationdatetime.should.be.a(six.string_types)
 | 
						|
    jf.should.have.property('laststatechangereason')
 | 
						|
    jf.readydatetime.should.be.a(six.string_types)
 | 
						|
    jf.startdatetime.should.be.a(six.string_types)
 | 
						|
    jf.state.should.equal('WAITING')
 | 
						|
 | 
						|
    jf.ec2keyname.should.equal(args['ec2_keyname'])
 | 
						|
    # Ec2SubnetId
 | 
						|
    jf.hadoopversion.should.equal(args['hadoop_version'])
 | 
						|
    int(jf.instancecount).should.equal(2)
 | 
						|
 | 
						|
    for ig in jf.instancegroups:
 | 
						|
        ig.creationdatetime.should.be.a(six.string_types)
 | 
						|
        # ig.enddatetime.should.be.a(six.string_types)
 | 
						|
        ig.should.have.property('instancegroupid').being.a(six.string_types)
 | 
						|
        int(ig.instancerequestcount).should.equal(1)
 | 
						|
        ig.instancerole.should.be.within(['MASTER', 'CORE'])
 | 
						|
        int(ig.instancerunningcount).should.equal(1)
 | 
						|
        ig.instancetype.should.equal('c1.medium')
 | 
						|
        ig.laststatechangereason.should.be.a(six.string_types)
 | 
						|
        ig.market.should.equal('ON_DEMAND')
 | 
						|
        ig.name.should.be.a(six.string_types)
 | 
						|
        ig.readydatetime.should.be.a(six.string_types)
 | 
						|
        ig.startdatetime.should.be.a(six.string_types)
 | 
						|
        ig.state.should.equal('RUNNING')
 | 
						|
 | 
						|
    jf.keepjobflowalivewhennosteps.should.equal('true')
 | 
						|
    jf.masterinstanceid.should.be.a(six.string_types)
 | 
						|
    jf.masterinstancetype.should.equal(args['master_instance_type'])
 | 
						|
    jf.masterpublicdnsname.should.be.a(six.string_types)
 | 
						|
    int(jf.normalizedinstancehours).should.equal(0)
 | 
						|
    jf.availabilityzone.should.equal(args['availability_zone'])
 | 
						|
    jf.slaveinstancetype.should.equal(args['slave_instance_type'])
 | 
						|
    jf.terminationprotected.should.equal('false')
 | 
						|
 | 
						|
    jf.jobflowid.should.equal(cluster_id)
 | 
						|
    # jf.jobflowrole.should.equal(args['job_flow_role'])
 | 
						|
    jf.loguri.should.equal(args['log_uri'])
 | 
						|
    jf.name.should.equal(args['name'])
 | 
						|
    # jf.servicerole.should.equal(args['service_role'])
 | 
						|
 | 
						|
    jf.steps.should.have.length_of(0)
 | 
						|
 | 
						|
    list(i.value for i in jf.supported_products).should.equal([])
 | 
						|
    jf.visibletoallusers.should.equal('true')
 | 
						|
 | 
						|
 | 
						|
@mock_emr_deprecated
 | 
						|
def test_list_clusters():
 | 
						|
    conn = boto.connect_emr()
 | 
						|
    args = run_jobflow_args.copy()
 | 
						|
    expected = {}
 | 
						|
 | 
						|
    for idx in range(40):
 | 
						|
        cluster_name = 'jobflow' + str(idx)
 | 
						|
        args['name'] = cluster_name
 | 
						|
        cluster_id = conn.run_jobflow(**args)
 | 
						|
        expected[cluster_id] = {
 | 
						|
            'id': cluster_id,
 | 
						|
            'name': cluster_name,
 | 
						|
            'normalizedinstancehours': '0',
 | 
						|
            'state': 'WAITING'
 | 
						|
        }
 | 
						|
 | 
						|
    # need sleep since it appears the timestamp is always rounded to
 | 
						|
    # the nearest second internally
 | 
						|
    time.sleep(1)
 | 
						|
    timestamp = datetime.now(pytz.utc)
 | 
						|
    time.sleep(1)
 | 
						|
 | 
						|
    for idx in range(40, 70):
 | 
						|
        cluster_name = 'jobflow' + str(idx)
 | 
						|
        args['name'] = cluster_name
 | 
						|
        cluster_id = conn.run_jobflow(**args)
 | 
						|
        conn.terminate_jobflow(cluster_id)
 | 
						|
        expected[cluster_id] = {
 | 
						|
            'id': cluster_id,
 | 
						|
            'name': cluster_name,
 | 
						|
            'normalizedinstancehours': '0',
 | 
						|
            'state': 'TERMINATED'
 | 
						|
        }
 | 
						|
 | 
						|
    args = {}
 | 
						|
    while 1:
 | 
						|
        resp = conn.list_clusters(**args)
 | 
						|
        clusters = resp.clusters
 | 
						|
        len(clusters).should.be.lower_than_or_equal_to(50)
 | 
						|
        for x in clusters:
 | 
						|
            y = expected[x.id]
 | 
						|
            x.id.should.equal(y['id'])
 | 
						|
            x.name.should.equal(y['name'])
 | 
						|
            x.normalizedinstancehours.should.equal(
 | 
						|
                y['normalizedinstancehours'])
 | 
						|
            x.status.state.should.equal(y['state'])
 | 
						|
            x.status.timeline.creationdatetime.should.be.a(six.string_types)
 | 
						|
            if y['state'] == 'TERMINATED':
 | 
						|
                x.status.timeline.enddatetime.should.be.a(six.string_types)
 | 
						|
            else:
 | 
						|
                x.status.timeline.shouldnt.have.property('enddatetime')
 | 
						|
            x.status.timeline.readydatetime.should.be.a(six.string_types)
 | 
						|
        if not hasattr(resp, 'marker'):
 | 
						|
            break
 | 
						|
        args = {'marker': resp.marker}
 | 
						|
 | 
						|
    resp = conn.list_clusters(cluster_states=['TERMINATED'])
 | 
						|
    resp.clusters.should.have.length_of(30)
 | 
						|
    for x in resp.clusters:
 | 
						|
        x.status.state.should.equal('TERMINATED')
 | 
						|
 | 
						|
    resp = conn.list_clusters(created_before=timestamp)
 | 
						|
    resp.clusters.should.have.length_of(40)
 | 
						|
 | 
						|
    resp = conn.list_clusters(created_after=timestamp)
 | 
						|
    resp.clusters.should.have.length_of(30)
 | 
						|
 | 
						|
 | 
						|
@mock_emr_deprecated
 | 
						|
def test_run_jobflow():
 | 
						|
    conn = boto.connect_emr()
 | 
						|
    args = run_jobflow_args.copy()
 | 
						|
    job_id = conn.run_jobflow(**args)
 | 
						|
    job_flow = conn.describe_jobflow(job_id)
 | 
						|
    job_flow.state.should.equal('WAITING')
 | 
						|
    job_flow.jobflowid.should.equal(job_id)
 | 
						|
    job_flow.name.should.equal(args['name'])
 | 
						|
    job_flow.masterinstancetype.should.equal(args['master_instance_type'])
 | 
						|
    job_flow.slaveinstancetype.should.equal(args['slave_instance_type'])
 | 
						|
    job_flow.loguri.should.equal(args['log_uri'])
 | 
						|
    job_flow.visibletoallusers.should.equal('false')
 | 
						|
    int(job_flow.normalizedinstancehours).should.equal(0)
 | 
						|
    job_flow.steps.should.have.length_of(0)
 | 
						|
 | 
						|
 | 
						|
@mock_emr_deprecated
 | 
						|
def test_run_jobflow_in_multiple_regions():
 | 
						|
    regions = {}
 | 
						|
    for region in ['us-east-1', 'eu-west-1']:
 | 
						|
        conn = boto.emr.connect_to_region(region)
 | 
						|
        args = run_jobflow_args.copy()
 | 
						|
        args['name'] = region
 | 
						|
        cluster_id = conn.run_jobflow(**args)
 | 
						|
        regions[region] = {'conn': conn, 'cluster_id': cluster_id}
 | 
						|
 | 
						|
    for region in regions.keys():
 | 
						|
        conn = regions[region]['conn']
 | 
						|
        jf = conn.describe_jobflow(regions[region]['cluster_id'])
 | 
						|
        jf.name.should.equal(region)
 | 
						|
 | 
						|
 | 
						|
@requires_boto_gte("2.8")
 | 
						|
@mock_emr_deprecated
 | 
						|
def test_run_jobflow_with_new_params():
 | 
						|
    # Test that run_jobflow works with newer params
 | 
						|
    conn = boto.connect_emr()
 | 
						|
    conn.run_jobflow(**run_jobflow_args)
 | 
						|
 | 
						|
 | 
						|
@requires_boto_gte("2.8")
 | 
						|
@mock_emr_deprecated
 | 
						|
def test_run_jobflow_with_visible_to_all_users():
 | 
						|
    conn = boto.connect_emr()
 | 
						|
    for expected in (True, False):
 | 
						|
        job_id = conn.run_jobflow(
 | 
						|
            visible_to_all_users=expected,
 | 
						|
            **run_jobflow_args
 | 
						|
        )
 | 
						|
        job_flow = conn.describe_jobflow(job_id)
 | 
						|
        job_flow.visibletoallusers.should.equal(str(expected).lower())
 | 
						|
 | 
						|
 | 
						|
@requires_boto_gte("2.8")
 | 
						|
@mock_emr_deprecated
 | 
						|
def test_run_jobflow_with_instance_groups():
 | 
						|
    input_groups = dict((g.name, g) for g in input_instance_groups)
 | 
						|
    conn = boto.connect_emr()
 | 
						|
    job_id = conn.run_jobflow(instance_groups=input_instance_groups,
 | 
						|
                              **run_jobflow_args)
 | 
						|
    job_flow = conn.describe_jobflow(job_id)
 | 
						|
    int(job_flow.instancecount).should.equal(
 | 
						|
        sum(g.num_instances for g in input_instance_groups))
 | 
						|
    for instance_group in job_flow.instancegroups:
 | 
						|
        expected = input_groups[instance_group.name]
 | 
						|
        instance_group.should.have.property('instancegroupid')
 | 
						|
        int(instance_group.instancerunningcount).should.equal(
 | 
						|
            expected.num_instances)
 | 
						|
        instance_group.instancerole.should.equal(expected.role)
 | 
						|
        instance_group.instancetype.should.equal(expected.type)
 | 
						|
        instance_group.market.should.equal(expected.market)
 | 
						|
        if hasattr(expected, 'bidprice'):
 | 
						|
            instance_group.bidprice.should.equal(expected.bidprice)
 | 
						|
 | 
						|
 | 
						|
@requires_boto_gte("2.8")
 | 
						|
@mock_emr_deprecated
 | 
						|
def test_set_termination_protection():
 | 
						|
    conn = boto.connect_emr()
 | 
						|
    job_id = conn.run_jobflow(**run_jobflow_args)
 | 
						|
    job_flow = conn.describe_jobflow(job_id)
 | 
						|
    job_flow.terminationprotected.should.equal('false')
 | 
						|
 | 
						|
    conn.set_termination_protection(job_id, True)
 | 
						|
    job_flow = conn.describe_jobflow(job_id)
 | 
						|
    job_flow.terminationprotected.should.equal('true')
 | 
						|
 | 
						|
    conn.set_termination_protection(job_id, False)
 | 
						|
    job_flow = conn.describe_jobflow(job_id)
 | 
						|
    job_flow.terminationprotected.should.equal('false')
 | 
						|
 | 
						|
 | 
						|
@requires_boto_gte("2.8")
 | 
						|
@mock_emr_deprecated
 | 
						|
def test_set_visible_to_all_users():
 | 
						|
    conn = boto.connect_emr()
 | 
						|
    args = run_jobflow_args.copy()
 | 
						|
    args['visible_to_all_users'] = False
 | 
						|
    job_id = conn.run_jobflow(**args)
 | 
						|
    job_flow = conn.describe_jobflow(job_id)
 | 
						|
    job_flow.visibletoallusers.should.equal('false')
 | 
						|
 | 
						|
    conn.set_visible_to_all_users(job_id, True)
 | 
						|
    job_flow = conn.describe_jobflow(job_id)
 | 
						|
    job_flow.visibletoallusers.should.equal('true')
 | 
						|
 | 
						|
    conn.set_visible_to_all_users(job_id, False)
 | 
						|
    job_flow = conn.describe_jobflow(job_id)
 | 
						|
    job_flow.visibletoallusers.should.equal('false')
 | 
						|
 | 
						|
 | 
						|
@mock_emr_deprecated
 | 
						|
def test_terminate_jobflow():
 | 
						|
    conn = boto.connect_emr()
 | 
						|
    job_id = conn.run_jobflow(**run_jobflow_args)
 | 
						|
    flow = conn.describe_jobflows()[0]
 | 
						|
    flow.state.should.equal('WAITING')
 | 
						|
 | 
						|
    conn.terminate_jobflow(job_id)
 | 
						|
    flow = conn.describe_jobflows()[0]
 | 
						|
    flow.state.should.equal('TERMINATED')
 | 
						|
 | 
						|
 | 
						|
# testing multiple end points for each feature
 | 
						|
 | 
						|
@mock_emr_deprecated
 | 
						|
def test_bootstrap_actions():
 | 
						|
    bootstrap_actions = [
 | 
						|
        BootstrapAction(
 | 
						|
            name='bs1',
 | 
						|
            path='path/to/script',
 | 
						|
            bootstrap_action_args=['arg1', 'arg2&arg3']),
 | 
						|
        BootstrapAction(
 | 
						|
            name='bs2',
 | 
						|
            path='path/to/anotherscript',
 | 
						|
            bootstrap_action_args=[])
 | 
						|
    ]
 | 
						|
 | 
						|
    conn = boto.connect_emr()
 | 
						|
    cluster_id = conn.run_jobflow(
 | 
						|
        bootstrap_actions=bootstrap_actions,
 | 
						|
        **run_jobflow_args
 | 
						|
    )
 | 
						|
 | 
						|
    jf = conn.describe_jobflow(cluster_id)
 | 
						|
    for x, y in zip(jf.bootstrapactions, bootstrap_actions):
 | 
						|
        x.name.should.equal(y.name)
 | 
						|
        x.path.should.equal(y.path)
 | 
						|
        list(o.value for o in x.args).should.equal(y.args())
 | 
						|
 | 
						|
    resp = conn.list_bootstrap_actions(cluster_id)
 | 
						|
    for i, y in enumerate(bootstrap_actions):
 | 
						|
        x = resp.actions[i]
 | 
						|
        x.name.should.equal(y.name)
 | 
						|
        x.scriptpath.should.equal(y.path)
 | 
						|
        list(arg.value for arg in x.args).should.equal(y.args())
 | 
						|
 | 
						|
 | 
						|
@mock_emr_deprecated
 | 
						|
def test_instance_groups():
 | 
						|
    input_groups = dict((g.name, g) for g in input_instance_groups)
 | 
						|
 | 
						|
    conn = boto.connect_emr()
 | 
						|
    args = run_jobflow_args.copy()
 | 
						|
    for key in ['master_instance_type', 'slave_instance_type', 'num_instances']:
 | 
						|
        del args[key]
 | 
						|
    args['instance_groups'] = input_instance_groups[:2]
 | 
						|
    job_id = conn.run_jobflow(**args)
 | 
						|
 | 
						|
    jf = conn.describe_jobflow(job_id)
 | 
						|
    base_instance_count = int(jf.instancecount)
 | 
						|
 | 
						|
    conn.add_instance_groups(job_id, input_instance_groups[2:])
 | 
						|
 | 
						|
    jf = conn.describe_jobflow(job_id)
 | 
						|
    int(jf.instancecount).should.equal(
 | 
						|
        sum(g.num_instances for g in input_instance_groups))
 | 
						|
    for x in jf.instancegroups:
 | 
						|
        y = input_groups[x.name]
 | 
						|
        if hasattr(y, 'bidprice'):
 | 
						|
            x.bidprice.should.equal(y.bidprice)
 | 
						|
        x.creationdatetime.should.be.a(six.string_types)
 | 
						|
        # x.enddatetime.should.be.a(six.string_types)
 | 
						|
        x.should.have.property('instancegroupid')
 | 
						|
        int(x.instancerequestcount).should.equal(y.num_instances)
 | 
						|
        x.instancerole.should.equal(y.role)
 | 
						|
        int(x.instancerunningcount).should.equal(y.num_instances)
 | 
						|
        x.instancetype.should.equal(y.type)
 | 
						|
        x.laststatechangereason.should.be.a(six.string_types)
 | 
						|
        x.market.should.equal(y.market)
 | 
						|
        x.name.should.be.a(six.string_types)
 | 
						|
        x.readydatetime.should.be.a(six.string_types)
 | 
						|
        x.startdatetime.should.be.a(six.string_types)
 | 
						|
        x.state.should.equal('RUNNING')
 | 
						|
 | 
						|
    for x in conn.list_instance_groups(job_id).instancegroups:
 | 
						|
        y = input_groups[x.name]
 | 
						|
        if hasattr(y, 'bidprice'):
 | 
						|
            x.bidprice.should.equal(y.bidprice)
 | 
						|
        # Configurations
 | 
						|
        # EbsBlockDevices
 | 
						|
        # EbsOptimized
 | 
						|
        x.should.have.property('id')
 | 
						|
        x.instancegrouptype.should.equal(y.role)
 | 
						|
        x.instancetype.should.equal(y.type)
 | 
						|
        x.market.should.equal(y.market)
 | 
						|
        x.name.should.equal(y.name)
 | 
						|
        int(x.requestedinstancecount).should.equal(y.num_instances)
 | 
						|
        int(x.runninginstancecount).should.equal(y.num_instances)
 | 
						|
        # ShrinkPolicy
 | 
						|
        x.status.state.should.equal('RUNNING')
 | 
						|
        x.status.statechangereason.code.should.be.a(six.string_types)
 | 
						|
        x.status.statechangereason.message.should.be.a(six.string_types)
 | 
						|
        x.status.timeline.creationdatetime.should.be.a(six.string_types)
 | 
						|
        # x.status.timeline.enddatetime.should.be.a(six.string_types)
 | 
						|
        x.status.timeline.readydatetime.should.be.a(six.string_types)
 | 
						|
 | 
						|
    igs = dict((g.name, g) for g in jf.instancegroups)
 | 
						|
 | 
						|
    conn.modify_instance_groups(
 | 
						|
        [igs['task-1'].instancegroupid, igs['task-2'].instancegroupid],
 | 
						|
        [2, 3])
 | 
						|
    jf = conn.describe_jobflow(job_id)
 | 
						|
    int(jf.instancecount).should.equal(base_instance_count + 5)
 | 
						|
    igs = dict((g.name, g) for g in jf.instancegroups)
 | 
						|
    int(igs['task-1'].instancerunningcount).should.equal(2)
 | 
						|
    int(igs['task-2'].instancerunningcount).should.equal(3)
 | 
						|
 | 
						|
 | 
						|
@mock_emr_deprecated
 | 
						|
def test_steps():
 | 
						|
    input_steps = [
 | 
						|
        StreamingStep(
 | 
						|
            name='My wordcount example',
 | 
						|
            mapper='s3n://elasticmapreduce/samples/wordcount/wordSplitter.py',
 | 
						|
            reducer='aggregate',
 | 
						|
            input='s3n://elasticmapreduce/samples/wordcount/input',
 | 
						|
            output='s3n://output_bucket/output/wordcount_output'),
 | 
						|
        StreamingStep(
 | 
						|
            name='My wordcount example & co.',
 | 
						|
            mapper='s3n://elasticmapreduce/samples/wordcount/wordSplitter2.py',
 | 
						|
            reducer='aggregate',
 | 
						|
            input='s3n://elasticmapreduce/samples/wordcount/input2',
 | 
						|
            output='s3n://output_bucket/output/wordcount_output2')
 | 
						|
    ]
 | 
						|
 | 
						|
    # TODO: implementation and test for cancel_steps
 | 
						|
 | 
						|
    conn = boto.connect_emr()
 | 
						|
    cluster_id = conn.run_jobflow(
 | 
						|
        steps=[input_steps[0]],
 | 
						|
        **run_jobflow_args)
 | 
						|
 | 
						|
    jf = conn.describe_jobflow(cluster_id)
 | 
						|
    jf.steps.should.have.length_of(1)
 | 
						|
 | 
						|
    conn.add_jobflow_steps(cluster_id, [input_steps[1]])
 | 
						|
 | 
						|
    jf = conn.describe_jobflow(cluster_id)
 | 
						|
    jf.steps.should.have.length_of(2)
 | 
						|
    for step in jf.steps:
 | 
						|
        step.actiononfailure.should.equal('TERMINATE_JOB_FLOW')
 | 
						|
        list(arg.value for arg in step.args).should.have.length_of(8)
 | 
						|
        step.creationdatetime.should.be.a(six.string_types)
 | 
						|
        # step.enddatetime.should.be.a(six.string_types)
 | 
						|
        step.jar.should.equal(
 | 
						|
            '/home/hadoop/contrib/streaming/hadoop-streaming.jar')
 | 
						|
        step.laststatechangereason.should.be.a(six.string_types)
 | 
						|
        step.mainclass.should.equal('')
 | 
						|
        step.name.should.be.a(six.string_types)
 | 
						|
        # step.readydatetime.should.be.a(six.string_types)
 | 
						|
        # step.startdatetime.should.be.a(six.string_types)
 | 
						|
        step.state.should.be.within(['STARTING', 'PENDING'])
 | 
						|
 | 
						|
    expected = dict((s.name, s) for s in input_steps)
 | 
						|
 | 
						|
    steps = conn.list_steps(cluster_id).steps
 | 
						|
    for x in steps:
 | 
						|
        y = expected[x.name]
 | 
						|
        # actiononfailure
 | 
						|
        list(arg.value for arg in x.config.args).should.equal([
 | 
						|
            '-mapper', y.mapper,
 | 
						|
            '-reducer', y.reducer,
 | 
						|
            '-input', y.input,
 | 
						|
            '-output', y.output,
 | 
						|
        ])
 | 
						|
        x.config.jar.should.equal(
 | 
						|
            '/home/hadoop/contrib/streaming/hadoop-streaming.jar')
 | 
						|
        x.config.mainclass.should.equal('')
 | 
						|
        # properties
 | 
						|
        x.should.have.property('id').should.be.a(six.string_types)
 | 
						|
        x.name.should.equal(y.name)
 | 
						|
        x.status.state.should.be.within(['STARTING', 'PENDING'])
 | 
						|
        # x.status.statechangereason
 | 
						|
        x.status.timeline.creationdatetime.should.be.a(six.string_types)
 | 
						|
        # x.status.timeline.enddatetime.should.be.a(six.string_types)
 | 
						|
        # x.status.timeline.startdatetime.should.be.a(six.string_types)
 | 
						|
 | 
						|
        x = conn.describe_step(cluster_id, x.id)
 | 
						|
        list(arg.value for arg in x.config.args).should.equal([
 | 
						|
            '-mapper', y.mapper,
 | 
						|
            '-reducer', y.reducer,
 | 
						|
            '-input', y.input,
 | 
						|
            '-output', y.output,
 | 
						|
        ])
 | 
						|
        x.config.jar.should.equal(
 | 
						|
            '/home/hadoop/contrib/streaming/hadoop-streaming.jar')
 | 
						|
        x.config.mainclass.should.equal('')
 | 
						|
        # properties
 | 
						|
        x.should.have.property('id').should.be.a(six.string_types)
 | 
						|
        x.name.should.equal(y.name)
 | 
						|
        x.status.state.should.be.within(['STARTING', 'PENDING'])
 | 
						|
        # x.status.statechangereason
 | 
						|
        x.status.timeline.creationdatetime.should.be.a(six.string_types)
 | 
						|
        # x.status.timeline.enddatetime.should.be.a(six.string_types)
 | 
						|
        # x.status.timeline.startdatetime.should.be.a(six.string_types)
 | 
						|
 | 
						|
    @requires_boto_gte('2.39')
 | 
						|
    def test_list_steps_with_states():
 | 
						|
        # boto's list_steps prior to 2.39 has a bug that ignores
 | 
						|
        # step_states argument.
 | 
						|
        steps = conn.list_steps(cluster_id).steps
 | 
						|
        step_id = steps[0].id
 | 
						|
        steps = conn.list_steps(cluster_id, step_states=['STARTING']).steps
 | 
						|
        steps.should.have.length_of(1)
 | 
						|
        steps[0].id.should.equal(step_id)
 | 
						|
    test_list_steps_with_states()
 | 
						|
 | 
						|
 | 
						|
@mock_emr_deprecated
 | 
						|
def test_tags():
 | 
						|
    input_tags = {"tag1": "val1", "tag2": "val2"}
 | 
						|
 | 
						|
    conn = boto.connect_emr()
 | 
						|
    cluster_id = conn.run_jobflow(**run_jobflow_args)
 | 
						|
 | 
						|
    conn.add_tags(cluster_id, input_tags)
 | 
						|
    cluster = conn.describe_cluster(cluster_id)
 | 
						|
    cluster.tags.should.have.length_of(2)
 | 
						|
    dict((t.key, t.value) for t in cluster.tags).should.equal(input_tags)
 | 
						|
 | 
						|
    conn.remove_tags(cluster_id, list(input_tags.keys()))
 | 
						|
    cluster = conn.describe_cluster(cluster_id)
 | 
						|
    cluster.tags.should.have.length_of(0)
 |