| 
									
										
										
										
											2015-11-15 17:24:36 +09:00
										 |  |  | # -*- coding: utf-8 -*- | 
					
						
							|  |  |  | from __future__ import unicode_literals | 
					
						
							| 
									
										
										
										
											2016-10-18 16:47:02 -07:00
										 |  |  | import time | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | from copy import deepcopy | 
					
						
							| 
									
										
										
										
											2016-10-18 16:47:02 -07:00
										 |  |  | from datetime import datetime | 
					
						
							| 
									
										
										
										
											2015-11-15 17:24:36 +09:00
										 |  |  | 
 | 
					
						
							|  |  |  | import boto3 | 
					
						
							| 
									
										
										
										
											2016-10-18 16:47:02 -07:00
										 |  |  | import pytz | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | import six | 
					
						
							| 
									
										
										
										
											2015-11-15 17:24:36 +09:00
										 |  |  | import sure  # noqa | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | from botocore.exceptions import ClientError | 
					
						
							|  |  |  | from nose.tools import assert_raises | 
					
						
							| 
									
										
										
										
											2015-11-15 17:24:36 +09:00
										 |  |  | 
 | 
					
						
							|  |  |  | from moto import mock_emr | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | run_job_flow_args = dict( | 
					
						
							|  |  |  |     Instances={ | 
					
						
							|  |  |  |         'InstanceCount': 3, | 
					
						
							|  |  |  |         'KeepJobFlowAliveWhenNoSteps': True, | 
					
						
							|  |  |  |         'MasterInstanceType': 'c3.medium', | 
					
						
							|  |  |  |         'Placement': {'AvailabilityZone': 'us-east-1a'}, | 
					
						
							|  |  |  |         'SlaveInstanceType': 'c3.xlarge', | 
					
						
							|  |  |  |     }, | 
					
						
							|  |  |  |     JobFlowRole='EMR_EC2_DefaultRole', | 
					
						
							|  |  |  |     LogUri='s3://mybucket/log', | 
					
						
							|  |  |  |     Name='cluster', | 
					
						
							|  |  |  |     ServiceRole='EMR_DefaultRole', | 
					
						
							|  |  |  |     VisibleToAllUsers=True) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | input_instance_groups = [ | 
					
						
							|  |  |  |     {'InstanceCount': 1, | 
					
						
							|  |  |  |      'InstanceRole': 'MASTER', | 
					
						
							|  |  |  |      'InstanceType': 'c1.medium', | 
					
						
							|  |  |  |      'Market': 'ON_DEMAND', | 
					
						
							|  |  |  |      'Name': 'master'}, | 
					
						
							|  |  |  |     {'InstanceCount': 3, | 
					
						
							|  |  |  |      'InstanceRole': 'CORE', | 
					
						
							|  |  |  |      'InstanceType': 'c1.medium', | 
					
						
							|  |  |  |      'Market': 'ON_DEMAND', | 
					
						
							|  |  |  |      'Name': 'core'}, | 
					
						
							|  |  |  |     {'InstanceCount': 6, | 
					
						
							|  |  |  |      'InstanceRole': 'TASK', | 
					
						
							|  |  |  |      'InstanceType': 'c1.large', | 
					
						
							|  |  |  |      'Market': 'SPOT', | 
					
						
							|  |  |  |      'Name': 'task-1', | 
					
						
							|  |  |  |      'BidPrice': '0.07'}, | 
					
						
							|  |  |  |     {'InstanceCount': 10, | 
					
						
							|  |  |  |      'InstanceRole': 'TASK', | 
					
						
							|  |  |  |      'InstanceType': 'c1.xlarge', | 
					
						
							|  |  |  |      'Market': 'SPOT', | 
					
						
							|  |  |  |      'Name': 'task-2', | 
					
						
							|  |  |  |      'BidPrice': '0.05'}, | 
					
						
							|  |  |  | ] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-11-15 17:24:36 +09:00
										 |  |  | @mock_emr | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | def test_describe_cluster(): | 
					
						
							| 
									
										
										
										
											2015-11-15 17:24:36 +09:00
										 |  |  |     client = boto3.client('emr', region_name='us-east-1') | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |     args = deepcopy(run_job_flow_args) | 
					
						
							|  |  |  |     args['Applications'] = [{'Name': 'Spark', 'Version': '2.4.2'}] | 
					
						
							|  |  |  |     args['Configurations'] = [ | 
					
						
							|  |  |  |         {'Classification': 'yarn-site', | 
					
						
							| 
									
										
										
										
											2016-10-31 11:29:39 -07:00
										 |  |  |          'Properties': {'someproperty': 'somevalue', | 
					
						
							| 
									
										
										
										
											2017-06-27 11:31:43 -07:00
										 |  |  |                         'someotherproperty': 'someothervalue'}}, | 
					
						
							|  |  |  |         {'Classification': 'nested-configs', | 
					
						
							|  |  |  |          'Properties': {}, | 
					
						
							|  |  |  |          'Configurations': [ | 
					
						
							|  |  |  |              { | 
					
						
							|  |  |  |                  'Classification': 'nested-config', | 
					
						
							|  |  |  |                  'Properties': { | 
					
						
							|  |  |  |                      'nested-property': 'nested-value' | 
					
						
							|  |  |  |                  } | 
					
						
							|  |  |  |              } | 
					
						
							|  |  |  |          ]} | 
					
						
							|  |  |  |     ] | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |     args['Instances']['AdditionalMasterSecurityGroups'] = ['additional-master'] | 
					
						
							|  |  |  |     args['Instances']['AdditionalSlaveSecurityGroups'] = ['additional-slave'] | 
					
						
							|  |  |  |     args['Instances']['Ec2KeyName'] = 'mykey' | 
					
						
							|  |  |  |     args['Instances']['Ec2SubnetId'] = 'subnet-8be41cec' | 
					
						
							|  |  |  |     args['Instances']['EmrManagedMasterSecurityGroup'] = 'master-security-group' | 
					
						
							|  |  |  |     args['Instances']['EmrManagedSlaveSecurityGroup'] = 'slave-security-group' | 
					
						
							|  |  |  |     args['Instances']['KeepJobFlowAliveWhenNoSteps'] = False | 
					
						
							|  |  |  |     args['Instances']['ServiceAccessSecurityGroup'] = 'service-access-security-group' | 
					
						
							|  |  |  |     args['Tags'] = [{'Key': 'tag1', 'Value': 'val1'}, | 
					
						
							|  |  |  |                     {'Key': 'tag2', 'Value': 'val2'}] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     cluster_id = client.run_job_flow(**args)['JobFlowId'] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     cl = client.describe_cluster(ClusterId=cluster_id)['Cluster'] | 
					
						
							|  |  |  |     cl['Applications'][0]['Name'].should.equal('Spark') | 
					
						
							|  |  |  |     cl['Applications'][0]['Version'].should.equal('2.4.2') | 
					
						
							|  |  |  |     cl['AutoTerminate'].should.equal(True) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     config = cl['Configurations'][0] | 
					
						
							|  |  |  |     config['Classification'].should.equal('yarn-site') | 
					
						
							|  |  |  |     config['Properties'].should.equal(args['Configurations'][0]['Properties']) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-06-27 11:31:43 -07:00
										 |  |  |     nested_config = cl['Configurations'][1] | 
					
						
							|  |  |  |     nested_config['Classification'].should.equal('nested-configs') | 
					
						
							|  |  |  |     nested_config['Properties'].should.equal(args['Configurations'][1]['Properties']) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |     attrs = cl['Ec2InstanceAttributes'] | 
					
						
							| 
									
										
										
										
											2017-02-23 21:37:43 -05:00
										 |  |  |     attrs['AdditionalMasterSecurityGroups'].should.equal( | 
					
						
							|  |  |  |         args['Instances']['AdditionalMasterSecurityGroups']) | 
					
						
							|  |  |  |     attrs['AdditionalSlaveSecurityGroups'].should.equal( | 
					
						
							|  |  |  |         args['Instances']['AdditionalSlaveSecurityGroups']) | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |     attrs['Ec2AvailabilityZone'].should.equal('us-east-1a') | 
					
						
							|  |  |  |     attrs['Ec2KeyName'].should.equal(args['Instances']['Ec2KeyName']) | 
					
						
							|  |  |  |     attrs['Ec2SubnetId'].should.equal(args['Instances']['Ec2SubnetId']) | 
					
						
							| 
									
										
										
										
											2017-02-23 21:37:43 -05:00
										 |  |  |     attrs['EmrManagedMasterSecurityGroup'].should.equal( | 
					
						
							|  |  |  |         args['Instances']['EmrManagedMasterSecurityGroup']) | 
					
						
							|  |  |  |     attrs['EmrManagedSlaveSecurityGroup'].should.equal( | 
					
						
							|  |  |  |         args['Instances']['EmrManagedSlaveSecurityGroup']) | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |     attrs['IamInstanceProfile'].should.equal(args['JobFlowRole']) | 
					
						
							| 
									
										
										
										
											2017-02-23 21:37:43 -05:00
										 |  |  |     attrs['ServiceAccessSecurityGroup'].should.equal( | 
					
						
							|  |  |  |         args['Instances']['ServiceAccessSecurityGroup']) | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |     cl['Id'].should.equal(cluster_id) | 
					
						
							|  |  |  |     cl['LogUri'].should.equal(args['LogUri']) | 
					
						
							|  |  |  |     cl['MasterPublicDnsName'].should.be.a(six.string_types) | 
					
						
							|  |  |  |     cl['Name'].should.equal(args['Name']) | 
					
						
							|  |  |  |     cl['NormalizedInstanceHours'].should.equal(0) | 
					
						
							|  |  |  |     # cl['ReleaseLabel'].should.equal('emr-5.0.0') | 
					
						
							|  |  |  |     cl.shouldnt.have.key('RequestedAmiVersion') | 
					
						
							|  |  |  |     cl['RunningAmiVersion'].should.equal('1.0.0') | 
					
						
							|  |  |  |     # cl['SecurityConfiguration'].should.be.a(six.string_types) | 
					
						
							|  |  |  |     cl['ServiceRole'].should.equal(args['ServiceRole']) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     status = cl['Status'] | 
					
						
							|  |  |  |     status['State'].should.equal('TERMINATED') | 
					
						
							|  |  |  |     # cluster['Status']['StateChangeReason'] | 
					
						
							|  |  |  |     status['Timeline']['CreationDateTime'].should.be.a('datetime.datetime') | 
					
						
							|  |  |  |     # status['Timeline']['EndDateTime'].should.equal(datetime(2014, 1, 24, 2, 19, 46, tzinfo=pytz.utc)) | 
					
						
							|  |  |  |     status['Timeline']['ReadyDateTime'].should.be.a('datetime.datetime') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     dict((t['Key'], t['Value']) for t in cl['Tags']).should.equal( | 
					
						
							|  |  |  |         dict((t['Key'], t['Value']) for t in args['Tags'])) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     cl['TerminationProtected'].should.equal(False) | 
					
						
							|  |  |  |     cl['VisibleToAllUsers'].should.equal(True) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-05-10 21:58:42 -04:00
										 |  |  | @mock_emr | 
					
						
							|  |  |  | def test_describe_cluster_not_found(): | 
					
						
							|  |  |  |     conn = boto3.client('emr', region_name='us-east-1') | 
					
						
							|  |  |  |     raised = False | 
					
						
							|  |  |  |     try: | 
					
						
							|  |  |  |         cluster = conn.describe_cluster(ClusterId='DummyId') | 
					
						
							|  |  |  |     except ClientError as e: | 
					
						
							|  |  |  |         if e.response['Error']['Code'] == "ResourceNotFoundException": | 
					
						
							|  |  |  |             raised = True | 
					
						
							|  |  |  |     raised.should.equal(True) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | @mock_emr | 
					
						
							|  |  |  | def test_describe_job_flows(): | 
					
						
							|  |  |  |     client = boto3.client('emr', region_name='us-east-1') | 
					
						
							| 
									
										
										
										
											2016-10-18 16:47:02 -07:00
										 |  |  |     args = deepcopy(run_job_flow_args) | 
					
						
							|  |  |  |     expected = {} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-02-23 19:43:48 -05:00
										 |  |  |     for idx in range(4): | 
					
						
							| 
									
										
										
										
											2016-10-18 16:47:02 -07:00
										 |  |  |         cluster_name = 'cluster' + str(idx) | 
					
						
							|  |  |  |         args['Name'] = cluster_name | 
					
						
							|  |  |  |         cluster_id = client.run_job_flow(**args)['JobFlowId'] | 
					
						
							|  |  |  |         expected[cluster_id] = { | 
					
						
							|  |  |  |             'Id': cluster_id, | 
					
						
							|  |  |  |             'Name': cluster_name, | 
					
						
							|  |  |  |             'State': 'WAITING' | 
					
						
							|  |  |  |         } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # need sleep since it appears the timestamp is always rounded to | 
					
						
							|  |  |  |     # the nearest second internally | 
					
						
							|  |  |  |     time.sleep(1) | 
					
						
							|  |  |  |     timestamp = datetime.now(pytz.utc) | 
					
						
							|  |  |  |     time.sleep(1) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-02-23 19:43:48 -05:00
										 |  |  |     for idx in range(4, 6): | 
					
						
							| 
									
										
										
										
											2016-10-18 16:47:02 -07:00
										 |  |  |         cluster_name = 'cluster' + str(idx) | 
					
						
							|  |  |  |         args['Name'] = cluster_name | 
					
						
							|  |  |  |         cluster_id = client.run_job_flow(**args)['JobFlowId'] | 
					
						
							|  |  |  |         client.terminate_job_flows(JobFlowIds=[cluster_id]) | 
					
						
							|  |  |  |         expected[cluster_id] = { | 
					
						
							|  |  |  |             'Id': cluster_id, | 
					
						
							|  |  |  |             'Name': cluster_name, | 
					
						
							|  |  |  |             'State': 'TERMINATED' | 
					
						
							|  |  |  |         } | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |     resp = client.describe_job_flows() | 
					
						
							| 
									
										
										
										
											2017-02-23 19:43:48 -05:00
										 |  |  |     resp['JobFlows'].should.have.length_of(6) | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-10-18 16:47:02 -07:00
										 |  |  |     for cluster_id, y in expected.items(): | 
					
						
							|  |  |  |         resp = client.describe_job_flows(JobFlowIds=[cluster_id]) | 
					
						
							|  |  |  |         resp['JobFlows'].should.have.length_of(1) | 
					
						
							|  |  |  |         resp['JobFlows'][0]['JobFlowId'].should.equal(cluster_id) | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-10-18 16:47:02 -07:00
										 |  |  |     resp = client.describe_job_flows(JobFlowStates=['WAITING']) | 
					
						
							| 
									
										
										
										
											2017-02-23 19:43:48 -05:00
										 |  |  |     resp['JobFlows'].should.have.length_of(4) | 
					
						
							| 
									
										
										
										
											2016-10-18 16:47:02 -07:00
										 |  |  |     for x in resp['JobFlows']: | 
					
						
							|  |  |  |         x['ExecutionStatusDetail']['State'].should.equal('WAITING') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     resp = client.describe_job_flows(CreatedBefore=timestamp) | 
					
						
							| 
									
										
										
										
											2017-02-23 19:43:48 -05:00
										 |  |  |     resp['JobFlows'].should.have.length_of(4) | 
					
						
							| 
									
										
										
										
											2016-10-18 16:47:02 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |     resp = client.describe_job_flows(CreatedAfter=timestamp) | 
					
						
							| 
									
										
										
										
											2017-02-23 19:43:48 -05:00
										 |  |  |     resp['JobFlows'].should.have.length_of(2) | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | @mock_emr | 
					
						
							|  |  |  | def test_describe_job_flow(): | 
					
						
							|  |  |  |     client = boto3.client('emr', region_name='us-east-1') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     args = deepcopy(run_job_flow_args) | 
					
						
							|  |  |  |     args['AmiVersion'] = '3.8.1' | 
					
						
							|  |  |  |     args['Instances'].update( | 
					
						
							|  |  |  |         {'Ec2KeyName': 'ec2keyname', | 
					
						
							|  |  |  |          'Ec2SubnetId': 'subnet-8be41cec', | 
					
						
							|  |  |  |          'HadoopVersion': '2.4.0'}) | 
					
						
							|  |  |  |     args['VisibleToAllUsers'] = True | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     cluster_id = client.run_job_flow(**args)['JobFlowId'] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     jf = client.describe_job_flows(JobFlowIds=[cluster_id])['JobFlows'][0] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     jf['AmiVersion'].should.equal(args['AmiVersion']) | 
					
						
							|  |  |  |     jf.shouldnt.have.key('BootstrapActions') | 
					
						
							|  |  |  |     esd = jf['ExecutionStatusDetail'] | 
					
						
							|  |  |  |     esd['CreationDateTime'].should.be.a('datetime.datetime') | 
					
						
							|  |  |  |     # esd['EndDateTime'].should.be.a('datetime.datetime') | 
					
						
							|  |  |  |     # esd['LastStateChangeReason'].should.be.a(six.string_types) | 
					
						
							|  |  |  |     esd['ReadyDateTime'].should.be.a('datetime.datetime') | 
					
						
							|  |  |  |     esd['StartDateTime'].should.be.a('datetime.datetime') | 
					
						
							|  |  |  |     esd['State'].should.equal('WAITING') | 
					
						
							|  |  |  |     attrs = jf['Instances'] | 
					
						
							|  |  |  |     attrs['Ec2KeyName'].should.equal(args['Instances']['Ec2KeyName']) | 
					
						
							|  |  |  |     attrs['Ec2SubnetId'].should.equal(args['Instances']['Ec2SubnetId']) | 
					
						
							|  |  |  |     attrs['HadoopVersion'].should.equal(args['Instances']['HadoopVersion']) | 
					
						
							|  |  |  |     attrs['InstanceCount'].should.equal(args['Instances']['InstanceCount']) | 
					
						
							|  |  |  |     for ig in attrs['InstanceGroups']: | 
					
						
							|  |  |  |         # ig['BidPrice'] | 
					
						
							|  |  |  |         ig['CreationDateTime'].should.be.a('datetime.datetime') | 
					
						
							|  |  |  |         # ig['EndDateTime'].should.be.a('datetime.datetime') | 
					
						
							|  |  |  |         ig['InstanceGroupId'].should.be.a(six.string_types) | 
					
						
							|  |  |  |         ig['InstanceRequestCount'].should.be.a(int) | 
					
						
							|  |  |  |         ig['InstanceRole'].should.be.within(['MASTER', 'CORE']) | 
					
						
							|  |  |  |         ig['InstanceRunningCount'].should.be.a(int) | 
					
						
							|  |  |  |         ig['InstanceType'].should.be.within(['c3.medium', 'c3.xlarge']) | 
					
						
							|  |  |  |         # ig['LastStateChangeReason'].should.be.a(six.string_types) | 
					
						
							|  |  |  |         ig['Market'].should.equal('ON_DEMAND') | 
					
						
							|  |  |  |         ig['Name'].should.be.a(six.string_types) | 
					
						
							|  |  |  |         ig['ReadyDateTime'].should.be.a('datetime.datetime') | 
					
						
							|  |  |  |         ig['StartDateTime'].should.be.a('datetime.datetime') | 
					
						
							|  |  |  |         ig['State'].should.equal('RUNNING') | 
					
						
							|  |  |  |     attrs['KeepJobFlowAliveWhenNoSteps'].should.equal(True) | 
					
						
							|  |  |  |     # attrs['MasterInstanceId'].should.be.a(six.string_types) | 
					
						
							| 
									
										
										
										
											2017-02-23 21:37:43 -05:00
										 |  |  |     attrs['MasterInstanceType'].should.equal( | 
					
						
							|  |  |  |         args['Instances']['MasterInstanceType']) | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |     attrs['MasterPublicDnsName'].should.be.a(six.string_types) | 
					
						
							|  |  |  |     attrs['NormalizedInstanceHours'].should.equal(0) | 
					
						
							| 
									
										
										
										
											2017-02-23 21:37:43 -05:00
										 |  |  |     attrs['Placement']['AvailabilityZone'].should.equal( | 
					
						
							|  |  |  |         args['Instances']['Placement']['AvailabilityZone']) | 
					
						
							|  |  |  |     attrs['SlaveInstanceType'].should.equal( | 
					
						
							|  |  |  |         args['Instances']['SlaveInstanceType']) | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |     attrs['TerminationProtected'].should.equal(False) | 
					
						
							|  |  |  |     jf['JobFlowId'].should.equal(cluster_id) | 
					
						
							|  |  |  |     jf['JobFlowRole'].should.equal(args['JobFlowRole']) | 
					
						
							|  |  |  |     jf['LogUri'].should.equal(args['LogUri']) | 
					
						
							|  |  |  |     jf['Name'].should.equal(args['Name']) | 
					
						
							|  |  |  |     jf['ServiceRole'].should.equal(args['ServiceRole']) | 
					
						
							| 
									
										
										
										
											2016-10-16 21:49:10 -07:00
										 |  |  |     jf['Steps'].should.equal([]) | 
					
						
							|  |  |  |     jf['SupportedProducts'].should.equal([]) | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |     jf['VisibleToAllUsers'].should.equal(True) | 
					
						
							| 
									
										
										
										
											2015-11-15 17:24:36 +09:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | @mock_emr | 
					
						
							|  |  |  | def test_list_clusters(): | 
					
						
							|  |  |  |     client = boto3.client('emr', region_name='us-east-1') | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |     args = deepcopy(run_job_flow_args) | 
					
						
							| 
									
										
										
										
											2016-10-18 16:47:02 -07:00
										 |  |  |     expected = {} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     for idx in range(40): | 
					
						
							|  |  |  |         cluster_name = 'jobflow' + str(idx) | 
					
						
							|  |  |  |         args['Name'] = cluster_name | 
					
						
							|  |  |  |         cluster_id = client.run_job_flow(**args)['JobFlowId'] | 
					
						
							|  |  |  |         expected[cluster_id] = { | 
					
						
							|  |  |  |             'Id': cluster_id, | 
					
						
							|  |  |  |             'Name': cluster_name, | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |             'NormalizedInstanceHours': 0, | 
					
						
							| 
									
										
										
										
											2016-10-18 16:47:02 -07:00
										 |  |  |             'State': 'WAITING' | 
					
						
							|  |  |  |         } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # need sleep since it appears the timestamp is always rounded to | 
					
						
							|  |  |  |     # the nearest second internally | 
					
						
							|  |  |  |     time.sleep(1) | 
					
						
							|  |  |  |     timestamp = datetime.now(pytz.utc) | 
					
						
							|  |  |  |     time.sleep(1) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     for idx in range(40, 70): | 
					
						
							|  |  |  |         cluster_name = 'jobflow' + str(idx) | 
					
						
							|  |  |  |         args['Name'] = cluster_name | 
					
						
							|  |  |  |         cluster_id = client.run_job_flow(**args)['JobFlowId'] | 
					
						
							|  |  |  |         client.terminate_job_flows(JobFlowIds=[cluster_id]) | 
					
						
							|  |  |  |         expected[cluster_id] = { | 
					
						
							|  |  |  |             'Id': cluster_id, | 
					
						
							|  |  |  |             'Name': cluster_name, | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |             'NormalizedInstanceHours': 0, | 
					
						
							| 
									
										
										
										
											2016-10-18 16:47:02 -07:00
										 |  |  |             'State': 'TERMINATED' | 
					
						
							|  |  |  |         } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     args = {} | 
					
						
							|  |  |  |     while 1: | 
					
						
							|  |  |  |         resp = client.list_clusters(**args) | 
					
						
							|  |  |  |         clusters = resp['Clusters'] | 
					
						
							|  |  |  |         len(clusters).should.be.lower_than_or_equal_to(50) | 
					
						
							|  |  |  |         for x in clusters: | 
					
						
							|  |  |  |             y = expected[x['Id']] | 
					
						
							|  |  |  |             x['Id'].should.equal(y['Id']) | 
					
						
							|  |  |  |             x['Name'].should.equal(y['Name']) | 
					
						
							| 
									
										
										
										
											2017-02-23 21:37:43 -05:00
										 |  |  |             x['NormalizedInstanceHours'].should.equal( | 
					
						
							|  |  |  |                 y['NormalizedInstanceHours']) | 
					
						
							| 
									
										
										
										
											2016-10-18 16:47:02 -07:00
										 |  |  |             x['Status']['State'].should.equal(y['State']) | 
					
						
							| 
									
										
										
										
											2017-02-23 21:37:43 -05:00
										 |  |  |             x['Status']['Timeline'][ | 
					
						
							|  |  |  |                 'CreationDateTime'].should.be.a('datetime.datetime') | 
					
						
							| 
									
										
										
										
											2016-10-18 16:47:02 -07:00
										 |  |  |             if y['State'] == 'TERMINATED': | 
					
						
							| 
									
										
										
										
											2017-02-23 21:37:43 -05:00
										 |  |  |                 x['Status']['Timeline'][ | 
					
						
							|  |  |  |                     'EndDateTime'].should.be.a('datetime.datetime') | 
					
						
							| 
									
										
										
										
											2016-10-18 16:47:02 -07:00
										 |  |  |             else: | 
					
						
							|  |  |  |                 x['Status']['Timeline'].shouldnt.have.key('EndDateTime') | 
					
						
							| 
									
										
										
										
											2017-02-23 21:37:43 -05:00
										 |  |  |             x['Status']['Timeline'][ | 
					
						
							|  |  |  |                 'ReadyDateTime'].should.be.a('datetime.datetime') | 
					
						
							| 
									
										
										
										
											2016-10-18 16:47:02 -07:00
										 |  |  |         marker = resp.get('Marker') | 
					
						
							|  |  |  |         if marker is None: | 
					
						
							|  |  |  |             break | 
					
						
							|  |  |  |         args = {'Marker': marker} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     resp = client.list_clusters(ClusterStates=['TERMINATED']) | 
					
						
							|  |  |  |     resp['Clusters'].should.have.length_of(30) | 
					
						
							|  |  |  |     for x in resp['Clusters']: | 
					
						
							|  |  |  |         x['Status']['State'].should.equal('TERMINATED') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     resp = client.list_clusters(CreatedBefore=timestamp) | 
					
						
							|  |  |  |     resp['Clusters'].should.have.length_of(40) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     resp = client.list_clusters(CreatedAfter=timestamp) | 
					
						
							|  |  |  |     resp['Clusters'].should.have.length_of(30) | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | @mock_emr | 
					
						
							|  |  |  | def test_run_job_flow(): | 
					
						
							|  |  |  |     client = boto3.client('emr', region_name='us-east-1') | 
					
						
							|  |  |  |     args = deepcopy(run_job_flow_args) | 
					
						
							|  |  |  |     cluster_id = client.run_job_flow(**args)['JobFlowId'] | 
					
						
							|  |  |  |     resp = client.describe_job_flows(JobFlowIds=[cluster_id])['JobFlows'][0] | 
					
						
							|  |  |  |     resp['ExecutionStatusDetail']['State'].should.equal('WAITING') | 
					
						
							|  |  |  |     resp['JobFlowId'].should.equal(cluster_id) | 
					
						
							|  |  |  |     resp['Name'].should.equal(args['Name']) | 
					
						
							| 
									
										
										
										
											2017-02-23 21:37:43 -05:00
										 |  |  |     resp['Instances']['MasterInstanceType'].should.equal( | 
					
						
							|  |  |  |         args['Instances']['MasterInstanceType']) | 
					
						
							|  |  |  |     resp['Instances']['SlaveInstanceType'].should.equal( | 
					
						
							|  |  |  |         args['Instances']['SlaveInstanceType']) | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |     resp['LogUri'].should.equal(args['LogUri']) | 
					
						
							|  |  |  |     resp['VisibleToAllUsers'].should.equal(args['VisibleToAllUsers']) | 
					
						
							|  |  |  |     resp['Instances']['NormalizedInstanceHours'].should.equal(0) | 
					
						
							| 
									
										
										
										
											2016-10-16 21:49:10 -07:00
										 |  |  |     resp['Steps'].should.equal([]) | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | @mock_emr | 
					
						
							|  |  |  | def test_run_job_flow_with_invalid_params(): | 
					
						
							|  |  |  |     client = boto3.client('emr', region_name='us-east-1') | 
					
						
							| 
									
										
										
										
											2017-02-23 19:43:48 -05:00
										 |  |  |     with assert_raises(ClientError) as ex: | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |         # cannot set both AmiVersion and ReleaseLabel | 
					
						
							|  |  |  |         args = deepcopy(run_job_flow_args) | 
					
						
							|  |  |  |         args['AmiVersion'] = '2.4' | 
					
						
							|  |  |  |         args['ReleaseLabel'] = 'emr-5.0.0' | 
					
						
							|  |  |  |         client.run_job_flow(**args) | 
					
						
							| 
									
										
										
										
											2017-02-23 22:28:09 -05:00
										 |  |  |     ex.exception.response['Error']['Code'].should.equal('ValidationException') | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | @mock_emr | 
					
						
							|  |  |  | def test_run_job_flow_in_multiple_regions(): | 
					
						
							|  |  |  |     regions = {} | 
					
						
							|  |  |  |     for region in ['us-east-1', 'eu-west-1']: | 
					
						
							|  |  |  |         client = boto3.client('emr', region_name=region) | 
					
						
							|  |  |  |         args = deepcopy(run_job_flow_args) | 
					
						
							|  |  |  |         args['Name'] = region | 
					
						
							|  |  |  |         cluster_id = client.run_job_flow(**args)['JobFlowId'] | 
					
						
							|  |  |  |         regions[region] = {'client': client, 'cluster_id': cluster_id} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     for region in regions.keys(): | 
					
						
							|  |  |  |         client = regions[region]['client'] | 
					
						
							|  |  |  |         resp = client.describe_cluster(ClusterId=regions[region]['cluster_id']) | 
					
						
							|  |  |  |         resp['Cluster']['Name'].should.equal(region) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | @mock_emr | 
					
						
							|  |  |  | def test_run_job_flow_with_new_params(): | 
					
						
							|  |  |  |     client = boto3.client('emr', region_name='us-east-1') | 
					
						
							|  |  |  |     resp = client.run_job_flow(**run_job_flow_args) | 
					
						
							|  |  |  |     resp.should.have.key('JobFlowId') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | @mock_emr | 
					
						
							|  |  |  | def test_run_job_flow_with_visible_to_all_users(): | 
					
						
							|  |  |  |     client = boto3.client('emr', region_name='us-east-1') | 
					
						
							|  |  |  |     for expected in (True, False): | 
					
						
							|  |  |  |         args = deepcopy(run_job_flow_args) | 
					
						
							|  |  |  |         args['VisibleToAllUsers'] = expected | 
					
						
							|  |  |  |         resp = client.run_job_flow(**args) | 
					
						
							|  |  |  |         cluster_id = resp['JobFlowId'] | 
					
						
							|  |  |  |         resp = client.describe_cluster(ClusterId=cluster_id) | 
					
						
							|  |  |  |         resp['Cluster']['VisibleToAllUsers'].should.equal(expected) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | @mock_emr | 
					
						
							|  |  |  | def test_run_job_flow_with_instance_groups(): | 
					
						
							|  |  |  |     input_groups = dict((g['Name'], g) for g in input_instance_groups) | 
					
						
							|  |  |  |     client = boto3.client('emr', region_name='us-east-1') | 
					
						
							|  |  |  |     args = deepcopy(run_job_flow_args) | 
					
						
							|  |  |  |     args['Instances'] = {'InstanceGroups': input_instance_groups} | 
					
						
							|  |  |  |     cluster_id = client.run_job_flow(**args)['JobFlowId'] | 
					
						
							| 
									
										
										
										
											2017-02-23 21:37:43 -05:00
										 |  |  |     groups = client.list_instance_groups(ClusterId=cluster_id)[ | 
					
						
							|  |  |  |         'InstanceGroups'] | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |     for x in groups: | 
					
						
							|  |  |  |         y = input_groups[x['Name']] | 
					
						
							|  |  |  |         x.should.have.key('Id') | 
					
						
							|  |  |  |         x['RequestedInstanceCount'].should.equal(y['InstanceCount']) | 
					
						
							|  |  |  |         x['InstanceGroupType'].should.equal(y['InstanceRole']) | 
					
						
							|  |  |  |         x['InstanceType'].should.equal(y['InstanceType']) | 
					
						
							|  |  |  |         x['Market'].should.equal(y['Market']) | 
					
						
							|  |  |  |         if 'BidPrice' in y: | 
					
						
							|  |  |  |             x['BidPrice'].should.equal(y['BidPrice']) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | @mock_emr | 
					
						
							|  |  |  | def test_set_termination_protection(): | 
					
						
							|  |  |  |     client = boto3.client('emr', region_name='us-east-1') | 
					
						
							|  |  |  |     args = deepcopy(run_job_flow_args) | 
					
						
							|  |  |  |     args['Instances']['TerminationProtected'] = False | 
					
						
							|  |  |  |     resp = client.run_job_flow(**args) | 
					
						
							|  |  |  |     cluster_id = resp['JobFlowId'] | 
					
						
							|  |  |  |     resp = client.describe_cluster(ClusterId=cluster_id) | 
					
						
							|  |  |  |     resp['Cluster']['TerminationProtected'].should.equal(False) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     for expected in (True, False): | 
					
						
							|  |  |  |         resp = client.set_termination_protection(JobFlowIds=[cluster_id], | 
					
						
							|  |  |  |                                                  TerminationProtected=expected) | 
					
						
							|  |  |  |         resp = client.describe_cluster(ClusterId=cluster_id) | 
					
						
							|  |  |  |         resp['Cluster']['TerminationProtected'].should.equal(expected) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | @mock_emr | 
					
						
							|  |  |  | def test_set_visible_to_all_users(): | 
					
						
							|  |  |  |     client = boto3.client('emr', region_name='us-east-1') | 
					
						
							|  |  |  |     args = deepcopy(run_job_flow_args) | 
					
						
							|  |  |  |     args['VisibleToAllUsers'] = False | 
					
						
							|  |  |  |     resp = client.run_job_flow(**args) | 
					
						
							|  |  |  |     cluster_id = resp['JobFlowId'] | 
					
						
							|  |  |  |     resp = client.describe_cluster(ClusterId=cluster_id) | 
					
						
							|  |  |  |     resp['Cluster']['VisibleToAllUsers'].should.equal(False) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     for expected in (True, False): | 
					
						
							|  |  |  |         resp = client.set_visible_to_all_users(JobFlowIds=[cluster_id], | 
					
						
							|  |  |  |                                                VisibleToAllUsers=expected) | 
					
						
							|  |  |  |         resp = client.describe_cluster(ClusterId=cluster_id) | 
					
						
							|  |  |  |         resp['Cluster']['VisibleToAllUsers'].should.equal(expected) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | @mock_emr | 
					
						
							|  |  |  | def test_terminate_job_flows(): | 
					
						
							|  |  |  |     client = boto3.client('emr', region_name='us-east-1') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     resp = client.run_job_flow(**run_job_flow_args) | 
					
						
							|  |  |  |     cluster_id = resp['JobFlowId'] | 
					
						
							|  |  |  |     resp = client.describe_cluster(ClusterId=cluster_id) | 
					
						
							|  |  |  |     resp['Cluster']['Status']['State'].should.equal('WAITING') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     resp = client.terminate_job_flows(JobFlowIds=[cluster_id]) | 
					
						
							|  |  |  |     resp = client.describe_cluster(ClusterId=cluster_id) | 
					
						
							|  |  |  |     resp['Cluster']['Status']['State'].should.equal('TERMINATED') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | # testing multiple end points for each feature | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | @mock_emr | 
					
						
							|  |  |  | def test_bootstrap_actions(): | 
					
						
							|  |  |  |     bootstrap_actions = [ | 
					
						
							|  |  |  |         {'Name': 'bs1', | 
					
						
							|  |  |  |          'ScriptBootstrapAction': { | 
					
						
							|  |  |  |              'Args': ['arg1', 'arg2'], | 
					
						
							| 
									
										
										
										
											2016-10-16 21:49:10 -07:00
										 |  |  |              'Path': 's3://path/to/script'}}, | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |         {'Name': 'bs2', | 
					
						
							|  |  |  |          'ScriptBootstrapAction': { | 
					
						
							| 
									
										
										
										
											2016-10-16 21:49:10 -07:00
										 |  |  |              'Args': [], | 
					
						
							|  |  |  |              'Path': 's3://path/to/anotherscript'}} | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |     ] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     client = boto3.client('emr', region_name='us-east-1') | 
					
						
							|  |  |  |     args = deepcopy(run_job_flow_args) | 
					
						
							|  |  |  |     args['BootstrapActions'] = bootstrap_actions | 
					
						
							|  |  |  |     cluster_id = client.run_job_flow(**args)['JobFlowId'] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     cl = client.describe_job_flows(JobFlowIds=[cluster_id])['JobFlows'][0] | 
					
						
							|  |  |  |     for x, y in zip(cl['BootstrapActions'], bootstrap_actions): | 
					
						
							|  |  |  |         x['BootstrapActionConfig'].should.equal(y) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     resp = client.list_bootstrap_actions(ClusterId=cluster_id) | 
					
						
							|  |  |  |     for x, y in zip(resp['BootstrapActions'], bootstrap_actions): | 
					
						
							|  |  |  |         x['Name'].should.equal(y['Name']) | 
					
						
							|  |  |  |         if 'Args' in y['ScriptBootstrapAction']: | 
					
						
							|  |  |  |             x['Args'].should.equal(y['ScriptBootstrapAction']['Args']) | 
					
						
							|  |  |  |         x['ScriptPath'].should.equal(y['ScriptBootstrapAction']['Path']) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | @mock_emr | 
					
						
							|  |  |  | def test_instance_groups(): | 
					
						
							|  |  |  |     input_groups = dict((g['Name'], g) for g in input_instance_groups) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     client = boto3.client('emr', region_name='us-east-1') | 
					
						
							|  |  |  |     args = deepcopy(run_job_flow_args) | 
					
						
							|  |  |  |     for key in ['MasterInstanceType', 'SlaveInstanceType', 'InstanceCount']: | 
					
						
							|  |  |  |         del args['Instances'][key] | 
					
						
							|  |  |  |     args['Instances']['InstanceGroups'] = input_instance_groups[:2] | 
					
						
							|  |  |  |     cluster_id = client.run_job_flow(**args)['JobFlowId'] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     jf = client.describe_job_flows(JobFlowIds=[cluster_id])['JobFlows'][0] | 
					
						
							|  |  |  |     base_instance_count = jf['Instances']['InstanceCount'] | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-02-23 21:37:43 -05:00
										 |  |  |     client.add_instance_groups( | 
					
						
							|  |  |  |         JobFlowId=cluster_id, InstanceGroups=input_instance_groups[2:]) | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |     jf = client.describe_job_flows(JobFlowIds=[cluster_id])['JobFlows'][0] | 
					
						
							| 
									
										
										
										
											2017-02-23 21:37:43 -05:00
										 |  |  |     jf['Instances']['InstanceCount'].should.equal( | 
					
						
							|  |  |  |         sum(g['InstanceCount'] for g in input_instance_groups)) | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |     for x in jf['Instances']['InstanceGroups']: | 
					
						
							|  |  |  |         y = input_groups[x['Name']] | 
					
						
							|  |  |  |         if hasattr(y, 'BidPrice'): | 
					
						
							|  |  |  |             x['BidPrice'].should.equal('BidPrice') | 
					
						
							|  |  |  |         x['CreationDateTime'].should.be.a('datetime.datetime') | 
					
						
							|  |  |  |         # x['EndDateTime'].should.be.a('datetime.datetime') | 
					
						
							|  |  |  |         x.should.have.key('InstanceGroupId') | 
					
						
							|  |  |  |         x['InstanceRequestCount'].should.equal(y['InstanceCount']) | 
					
						
							|  |  |  |         x['InstanceRole'].should.equal(y['InstanceRole']) | 
					
						
							|  |  |  |         x['InstanceRunningCount'].should.equal(y['InstanceCount']) | 
					
						
							|  |  |  |         x['InstanceType'].should.equal(y['InstanceType']) | 
					
						
							|  |  |  |         # x['LastStateChangeReason'].should.equal(y['LastStateChangeReason']) | 
					
						
							|  |  |  |         x['Market'].should.equal(y['Market']) | 
					
						
							|  |  |  |         x['Name'].should.equal(y['Name']) | 
					
						
							|  |  |  |         x['ReadyDateTime'].should.be.a('datetime.datetime') | 
					
						
							|  |  |  |         x['StartDateTime'].should.be.a('datetime.datetime') | 
					
						
							|  |  |  |         x['State'].should.equal('RUNNING') | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-02-23 21:37:43 -05:00
										 |  |  |     groups = client.list_instance_groups(ClusterId=cluster_id)[ | 
					
						
							|  |  |  |         'InstanceGroups'] | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |     for x in groups: | 
					
						
							|  |  |  |         y = input_groups[x['Name']] | 
					
						
							|  |  |  |         if hasattr(y, 'BidPrice'): | 
					
						
							|  |  |  |             x['BidPrice'].should.equal('BidPrice') | 
					
						
							|  |  |  |         # Configurations | 
					
						
							|  |  |  |         # EbsBlockDevices | 
					
						
							|  |  |  |         # EbsOptimized | 
					
						
							|  |  |  |         x.should.have.key('Id') | 
					
						
							|  |  |  |         x['InstanceGroupType'].should.equal(y['InstanceRole']) | 
					
						
							|  |  |  |         x['InstanceType'].should.equal(y['InstanceType']) | 
					
						
							|  |  |  |         x['Market'].should.equal(y['Market']) | 
					
						
							|  |  |  |         x['Name'].should.equal(y['Name']) | 
					
						
							|  |  |  |         x['RequestedInstanceCount'].should.equal(y['InstanceCount']) | 
					
						
							|  |  |  |         x['RunningInstanceCount'].should.equal(y['InstanceCount']) | 
					
						
							|  |  |  |         # ShrinkPolicy | 
					
						
							|  |  |  |         x['Status']['State'].should.equal('RUNNING') | 
					
						
							|  |  |  |         x['Status']['StateChangeReason']['Code'].should.be.a(six.string_types) | 
					
						
							|  |  |  |         # x['Status']['StateChangeReason']['Message'].should.be.a(six.string_types) | 
					
						
							| 
									
										
										
										
											2017-02-23 21:37:43 -05:00
										 |  |  |         x['Status']['Timeline'][ | 
					
						
							|  |  |  |             'CreationDateTime'].should.be.a('datetime.datetime') | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |         # x['Status']['Timeline']['EndDateTime'].should.be.a('datetime.datetime') | 
					
						
							| 
									
										
										
										
											2017-02-23 21:37:43 -05:00
										 |  |  |         x['Status']['Timeline'][ | 
					
						
							|  |  |  |             'ReadyDateTime'].should.be.a('datetime.datetime') | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |     igs = dict((g['Name'], g) for g in groups) | 
					
						
							|  |  |  |     client.modify_instance_groups( | 
					
						
							|  |  |  |         InstanceGroups=[ | 
					
						
							|  |  |  |             {'InstanceGroupId': igs['task-1']['Id'], | 
					
						
							|  |  |  |              'InstanceCount': 2}, | 
					
						
							|  |  |  |             {'InstanceGroupId': igs['task-2']['Id'], | 
					
						
							|  |  |  |              'InstanceCount': 3}]) | 
					
						
							|  |  |  |     jf = client.describe_job_flows(JobFlowIds=[cluster_id])['JobFlows'][0] | 
					
						
							|  |  |  |     jf['Instances']['InstanceCount'].should.equal(base_instance_count + 5) | 
					
						
							|  |  |  |     igs = dict((g['Name'], g) for g in jf['Instances']['InstanceGroups']) | 
					
						
							|  |  |  |     igs['task-1']['InstanceRunningCount'].should.equal(2) | 
					
						
							|  |  |  |     igs['task-2']['InstanceRunningCount'].should.equal(3) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | @mock_emr | 
					
						
							|  |  |  | def test_steps(): | 
					
						
							|  |  |  |     input_steps = [{ | 
					
						
							|  |  |  |         'HadoopJarStep': { | 
					
						
							|  |  |  |             'Args': [ | 
					
						
							|  |  |  |                 'hadoop-streaming', | 
					
						
							|  |  |  |                 '-files', 's3://elasticmapreduce/samples/wordcount/wordSplitter.py#wordSplitter.py', | 
					
						
							|  |  |  |                 '-mapper', 'python wordSplitter.py', | 
					
						
							|  |  |  |                 '-input', 's3://elasticmapreduce/samples/wordcount/input', | 
					
						
							|  |  |  |                 '-output', 's3://output_bucket/output/wordcount_output', | 
					
						
							|  |  |  |                 '-reducer', 'aggregate' | 
					
						
							|  |  |  |             ], | 
					
						
							|  |  |  |             'Jar': 'command-runner.jar', | 
					
						
							|  |  |  |         }, | 
					
						
							|  |  |  |         'Name': 'My wordcount example', | 
					
						
							|  |  |  |     }, { | 
					
						
							|  |  |  |         'HadoopJarStep': { | 
					
						
							|  |  |  |             'Args': [ | 
					
						
							|  |  |  |                 'hadoop-streaming', | 
					
						
							|  |  |  |                 '-files', 's3://elasticmapreduce/samples/wordcount/wordSplitter2.py#wordSplitter2.py', | 
					
						
							|  |  |  |                 '-mapper', 'python wordSplitter2.py', | 
					
						
							|  |  |  |                 '-input', 's3://elasticmapreduce/samples/wordcount/input2', | 
					
						
							|  |  |  |                 '-output', 's3://output_bucket/output/wordcount_output2', | 
					
						
							|  |  |  |                 '-reducer', 'aggregate' | 
					
						
							|  |  |  |             ], | 
					
						
							|  |  |  |             'Jar': 'command-runner.jar', | 
					
						
							|  |  |  |         }, | 
					
						
							|  |  |  |         'Name': 'My wordcount example2', | 
					
						
							|  |  |  |     }] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # TODO: implementation and test for cancel_steps | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     client = boto3.client('emr', region_name='us-east-1') | 
					
						
							|  |  |  |     args = deepcopy(run_job_flow_args) | 
					
						
							|  |  |  |     args['Steps'] = [input_steps[0]] | 
					
						
							|  |  |  |     cluster_id = client.run_job_flow(**args)['JobFlowId'] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     jf = client.describe_job_flows(JobFlowIds=[cluster_id])['JobFlows'][0] | 
					
						
							|  |  |  |     jf['Steps'].should.have.length_of(1) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     client.add_job_flow_steps(JobFlowId=cluster_id, Steps=[input_steps[1]]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     jf = client.describe_job_flows(JobFlowIds=[cluster_id])['JobFlows'][0] | 
					
						
							|  |  |  |     jf['Steps'].should.have.length_of(2) | 
					
						
							|  |  |  |     for idx, (x, y) in enumerate(zip(jf['Steps'], input_steps)): | 
					
						
							|  |  |  |         x['ExecutionStatusDetail'].should.have.key('CreationDateTime') | 
					
						
							|  |  |  |         # x['ExecutionStatusDetail'].should.have.key('EndDateTime') | 
					
						
							|  |  |  |         # x['ExecutionStatusDetail'].should.have.key('LastStateChangeReason') | 
					
						
							|  |  |  |         # x['ExecutionStatusDetail'].should.have.key('StartDateTime') | 
					
						
							| 
									
										
										
										
											2017-02-23 21:37:43 -05:00
										 |  |  |         x['ExecutionStatusDetail']['State'].should.equal( | 
					
						
							|  |  |  |             'STARTING' if idx == 0 else 'PENDING') | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |         x['StepConfig']['ActionOnFailure'].should.equal('TERMINATE_CLUSTER') | 
					
						
							| 
									
										
										
										
											2017-02-23 21:37:43 -05:00
										 |  |  |         x['StepConfig']['HadoopJarStep'][ | 
					
						
							|  |  |  |             'Args'].should.equal(y['HadoopJarStep']['Args']) | 
					
						
							|  |  |  |         x['StepConfig']['HadoopJarStep'][ | 
					
						
							|  |  |  |             'Jar'].should.equal(y['HadoopJarStep']['Jar']) | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |         if 'MainClass' in y['HadoopJarStep']: | 
					
						
							| 
									
										
										
										
											2017-02-23 21:37:43 -05:00
										 |  |  |             x['StepConfig']['HadoopJarStep']['MainClass'].should.equal( | 
					
						
							|  |  |  |                 y['HadoopJarStep']['MainClass']) | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |         if 'Properties' in y['HadoopJarStep']: | 
					
						
							| 
									
										
										
										
											2017-02-23 21:37:43 -05:00
										 |  |  |             x['StepConfig']['HadoopJarStep']['Properties'].should.equal( | 
					
						
							|  |  |  |                 y['HadoopJarStep']['Properties']) | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |         x['StepConfig']['Name'].should.equal(y['Name']) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     expected = dict((s['Name'], s) for s in input_steps) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     steps = client.list_steps(ClusterId=cluster_id)['Steps'] | 
					
						
							|  |  |  |     steps.should.have.length_of(2) | 
					
						
							|  |  |  |     for x in steps: | 
					
						
							|  |  |  |         y = expected[x['Name']] | 
					
						
							|  |  |  |         x['ActionOnFailure'].should.equal('TERMINATE_CLUSTER') | 
					
						
							|  |  |  |         x['Config']['Args'].should.equal(y['HadoopJarStep']['Args']) | 
					
						
							|  |  |  |         x['Config']['Jar'].should.equal(y['HadoopJarStep']['Jar']) | 
					
						
							|  |  |  |         # x['Config']['MainClass'].should.equal(y['HadoopJarStep']['MainClass']) | 
					
						
							|  |  |  |         # Properties | 
					
						
							|  |  |  |         x['Id'].should.be.a(six.string_types) | 
					
						
							|  |  |  |         x['Name'].should.equal(y['Name']) | 
					
						
							|  |  |  |         x['Status']['State'].should.be.within(['STARTING', 'PENDING']) | 
					
						
							|  |  |  |         # StateChangeReason | 
					
						
							| 
									
										
										
										
											2017-02-23 21:37:43 -05:00
										 |  |  |         x['Status']['Timeline'][ | 
					
						
							|  |  |  |             'CreationDateTime'].should.be.a('datetime.datetime') | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |         # x['Status']['Timeline']['EndDateTime'].should.be.a('datetime.datetime') | 
					
						
							|  |  |  |         # x['Status']['Timeline']['StartDateTime'].should.be.a('datetime.datetime') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         x = client.describe_step(ClusterId=cluster_id, StepId=x['Id'])['Step'] | 
					
						
							|  |  |  |         x['ActionOnFailure'].should.equal('TERMINATE_CLUSTER') | 
					
						
							|  |  |  |         x['Config']['Args'].should.equal(y['HadoopJarStep']['Args']) | 
					
						
							|  |  |  |         x['Config']['Jar'].should.equal(y['HadoopJarStep']['Jar']) | 
					
						
							|  |  |  |         # x['Config']['MainClass'].should.equal(y['HadoopJarStep']['MainClass']) | 
					
						
							|  |  |  |         # Properties | 
					
						
							|  |  |  |         x['Id'].should.be.a(six.string_types) | 
					
						
							|  |  |  |         x['Name'].should.equal(y['Name']) | 
					
						
							|  |  |  |         x['Status']['State'].should.be.within(['STARTING', 'PENDING']) | 
					
						
							|  |  |  |         # StateChangeReason | 
					
						
							| 
									
										
										
										
											2017-02-23 21:37:43 -05:00
										 |  |  |         x['Status']['Timeline'][ | 
					
						
							|  |  |  |             'CreationDateTime'].should.be.a('datetime.datetime') | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |         # x['Status']['Timeline']['EndDateTime'].should.be.a('datetime.datetime') | 
					
						
							|  |  |  |         # x['Status']['Timeline']['StartDateTime'].should.be.a('datetime.datetime') | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-10-18 16:47:02 -07:00
										 |  |  |     step_id = steps[0]['Id'] | 
					
						
							|  |  |  |     steps = client.list_steps(ClusterId=cluster_id, StepIds=[step_id])['Steps'] | 
					
						
							|  |  |  |     steps.should.have.length_of(1) | 
					
						
							|  |  |  |     steps[0]['Id'].should.equal(step_id) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-02-23 21:37:43 -05:00
										 |  |  |     steps = client.list_steps(ClusterId=cluster_id, | 
					
						
							|  |  |  |                               StepStates=['STARTING'])['Steps'] | 
					
						
							| 
									
										
										
										
											2016-10-18 16:47:02 -07:00
										 |  |  |     steps.should.have.length_of(1) | 
					
						
							|  |  |  |     steps[0]['Id'].should.equal(step_id) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  | @mock_emr | 
					
						
							|  |  |  | def test_tags(): | 
					
						
							|  |  |  |     input_tags = [{'Key': 'newkey1', 'Value': 'newval1'}, | 
					
						
							|  |  |  |                   {'Key': 'newkey2', 'Value': 'newval2'}] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     client = boto3.client('emr', region_name='us-east-1') | 
					
						
							|  |  |  |     cluster_id = client.run_job_flow(**run_job_flow_args)['JobFlowId'] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     client.add_tags(ResourceId=cluster_id, Tags=input_tags) | 
					
						
							|  |  |  |     resp = client.describe_cluster(ClusterId=cluster_id)['Cluster'] | 
					
						
							|  |  |  |     resp['Tags'].should.have.length_of(2) | 
					
						
							| 
									
										
										
										
											2017-02-23 21:37:43 -05:00
										 |  |  |     dict((t['Key'], t['Value']) for t in resp['Tags']).should.equal( | 
					
						
							|  |  |  |         dict((t['Key'], t['Value']) for t in input_tags)) | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-02-23 21:37:43 -05:00
										 |  |  |     client.remove_tags(ResourceId=cluster_id, TagKeys=[ | 
					
						
							|  |  |  |                        t['Key'] for t in input_tags]) | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |     resp = client.describe_cluster(ClusterId=cluster_id)['Cluster'] | 
					
						
							| 
									
										
										
										
											2016-10-16 21:49:10 -07:00
										 |  |  |     resp['Tags'].should.equal([]) |