| 
									
										
										
										
											2015-11-15 17:24:36 +09:00
										 |  |  | # -*- coding: utf-8 -*- | 
					
						
							|  |  |  | from __future__ import unicode_literals | 
					
						
							| 
									
										
										
										
											2016-10-18 16:47:02 -07:00
										 |  |  | import time | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | from copy import deepcopy | 
					
						
							| 
									
										
										
										
											2016-10-18 16:47:02 -07:00
										 |  |  | from datetime import datetime | 
					
						
							| 
									
										
										
										
											2015-11-15 17:24:36 +09:00
										 |  |  | 
 | 
					
						
							|  |  |  | import boto3 | 
					
						
							| 
									
										
										
										
											2016-10-18 16:47:02 -07:00
										 |  |  | import pytz | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | import six | 
					
						
							| 
									
										
										
										
											2015-11-15 17:24:36 +09:00
										 |  |  | import sure  # noqa | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | from botocore.exceptions import ClientError | 
					
						
							|  |  |  | from nose.tools import assert_raises | 
					
						
							| 
									
										
										
										
											2015-11-15 17:24:36 +09:00
										 |  |  | 
 | 
					
						
							|  |  |  | from moto import mock_emr | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | run_job_flow_args = dict( | 
					
						
							|  |  |  |     Instances={ | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |         "InstanceCount": 3, | 
					
						
							|  |  |  |         "KeepJobFlowAliveWhenNoSteps": True, | 
					
						
							|  |  |  |         "MasterInstanceType": "c3.medium", | 
					
						
							|  |  |  |         "Placement": {"AvailabilityZone": "us-east-1a"}, | 
					
						
							|  |  |  |         "SlaveInstanceType": "c3.xlarge", | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |     }, | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     JobFlowRole="EMR_EC2_DefaultRole", | 
					
						
							|  |  |  |     LogUri="s3://mybucket/log", | 
					
						
							|  |  |  |     Name="cluster", | 
					
						
							|  |  |  |     ServiceRole="EMR_DefaultRole", | 
					
						
							|  |  |  |     VisibleToAllUsers=True, | 
					
						
							|  |  |  | ) | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | input_instance_groups = [ | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     { | 
					
						
							|  |  |  |         "InstanceCount": 1, | 
					
						
							|  |  |  |         "InstanceRole": "MASTER", | 
					
						
							|  |  |  |         "InstanceType": "c1.medium", | 
					
						
							|  |  |  |         "Market": "ON_DEMAND", | 
					
						
							|  |  |  |         "Name": "master", | 
					
						
							|  |  |  |     }, | 
					
						
							|  |  |  |     { | 
					
						
							|  |  |  |         "InstanceCount": 3, | 
					
						
							|  |  |  |         "InstanceRole": "CORE", | 
					
						
							|  |  |  |         "InstanceType": "c1.medium", | 
					
						
							|  |  |  |         "Market": "ON_DEMAND", | 
					
						
							|  |  |  |         "Name": "core", | 
					
						
							|  |  |  |     }, | 
					
						
							|  |  |  |     { | 
					
						
							|  |  |  |         "InstanceCount": 6, | 
					
						
							|  |  |  |         "InstanceRole": "TASK", | 
					
						
							|  |  |  |         "InstanceType": "c1.large", | 
					
						
							|  |  |  |         "Market": "SPOT", | 
					
						
							|  |  |  |         "Name": "task-1", | 
					
						
							|  |  |  |         "BidPrice": "0.07", | 
					
						
							|  |  |  |     }, | 
					
						
							|  |  |  |     { | 
					
						
							|  |  |  |         "InstanceCount": 10, | 
					
						
							|  |  |  |         "InstanceRole": "TASK", | 
					
						
							|  |  |  |         "InstanceType": "c1.xlarge", | 
					
						
							|  |  |  |         "Market": "SPOT", | 
					
						
							|  |  |  |         "Name": "task-2", | 
					
						
							|  |  |  |         "BidPrice": "0.05", | 
					
						
							|  |  |  |     }, | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | ] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-11-15 17:24:36 +09:00
										 |  |  | @mock_emr | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | def test_describe_cluster(): | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     client = boto3.client("emr", region_name="us-east-1") | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |     args = deepcopy(run_job_flow_args) | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     args["Applications"] = [{"Name": "Spark", "Version": "2.4.2"}] | 
					
						
							|  |  |  |     args["Configurations"] = [ | 
					
						
							|  |  |  |         { | 
					
						
							|  |  |  |             "Classification": "yarn-site", | 
					
						
							|  |  |  |             "Properties": { | 
					
						
							|  |  |  |                 "someproperty": "somevalue", | 
					
						
							|  |  |  |                 "someotherproperty": "someothervalue", | 
					
						
							|  |  |  |             }, | 
					
						
							|  |  |  |         }, | 
					
						
							|  |  |  |         { | 
					
						
							|  |  |  |             "Classification": "nested-configs", | 
					
						
							|  |  |  |             "Properties": {}, | 
					
						
							|  |  |  |             "Configurations": [ | 
					
						
							|  |  |  |                 { | 
					
						
							|  |  |  |                     "Classification": "nested-config", | 
					
						
							|  |  |  |                     "Properties": {"nested-property": "nested-value"}, | 
					
						
							|  |  |  |                 } | 
					
						
							|  |  |  |             ], | 
					
						
							|  |  |  |         }, | 
					
						
							| 
									
										
										
										
											2017-06-27 11:31:43 -07:00
										 |  |  |     ] | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     args["Instances"]["AdditionalMasterSecurityGroups"] = ["additional-master"] | 
					
						
							|  |  |  |     args["Instances"]["AdditionalSlaveSecurityGroups"] = ["additional-slave"] | 
					
						
							|  |  |  |     args["Instances"]["Ec2KeyName"] = "mykey" | 
					
						
							|  |  |  |     args["Instances"]["Ec2SubnetId"] = "subnet-8be41cec" | 
					
						
							|  |  |  |     args["Instances"]["EmrManagedMasterSecurityGroup"] = "master-security-group" | 
					
						
							|  |  |  |     args["Instances"]["EmrManagedSlaveSecurityGroup"] = "slave-security-group" | 
					
						
							|  |  |  |     args["Instances"]["KeepJobFlowAliveWhenNoSteps"] = False | 
					
						
							|  |  |  |     args["Instances"]["ServiceAccessSecurityGroup"] = "service-access-security-group" | 
					
						
							|  |  |  |     args["Tags"] = [{"Key": "tag1", "Value": "val1"}, {"Key": "tag2", "Value": "val2"}] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     cluster_id = client.run_job_flow(**args)["JobFlowId"] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     cl = client.describe_cluster(ClusterId=cluster_id)["Cluster"] | 
					
						
							|  |  |  |     cl["Applications"][0]["Name"].should.equal("Spark") | 
					
						
							|  |  |  |     cl["Applications"][0]["Version"].should.equal("2.4.2") | 
					
						
							|  |  |  |     cl["AutoTerminate"].should.equal(True) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     config = cl["Configurations"][0] | 
					
						
							|  |  |  |     config["Classification"].should.equal("yarn-site") | 
					
						
							|  |  |  |     config["Properties"].should.equal(args["Configurations"][0]["Properties"]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     nested_config = cl["Configurations"][1] | 
					
						
							|  |  |  |     nested_config["Classification"].should.equal("nested-configs") | 
					
						
							|  |  |  |     nested_config["Properties"].should.equal(args["Configurations"][1]["Properties"]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     attrs = cl["Ec2InstanceAttributes"] | 
					
						
							|  |  |  |     attrs["AdditionalMasterSecurityGroups"].should.equal( | 
					
						
							|  |  |  |         args["Instances"]["AdditionalMasterSecurityGroups"] | 
					
						
							|  |  |  |     ) | 
					
						
							|  |  |  |     attrs["AdditionalSlaveSecurityGroups"].should.equal( | 
					
						
							|  |  |  |         args["Instances"]["AdditionalSlaveSecurityGroups"] | 
					
						
							|  |  |  |     ) | 
					
						
							|  |  |  |     attrs["Ec2AvailabilityZone"].should.equal("us-east-1a") | 
					
						
							|  |  |  |     attrs["Ec2KeyName"].should.equal(args["Instances"]["Ec2KeyName"]) | 
					
						
							|  |  |  |     attrs["Ec2SubnetId"].should.equal(args["Instances"]["Ec2SubnetId"]) | 
					
						
							|  |  |  |     attrs["EmrManagedMasterSecurityGroup"].should.equal( | 
					
						
							|  |  |  |         args["Instances"]["EmrManagedMasterSecurityGroup"] | 
					
						
							|  |  |  |     ) | 
					
						
							|  |  |  |     attrs["EmrManagedSlaveSecurityGroup"].should.equal( | 
					
						
							|  |  |  |         args["Instances"]["EmrManagedSlaveSecurityGroup"] | 
					
						
							|  |  |  |     ) | 
					
						
							|  |  |  |     attrs["IamInstanceProfile"].should.equal(args["JobFlowRole"]) | 
					
						
							|  |  |  |     attrs["ServiceAccessSecurityGroup"].should.equal( | 
					
						
							|  |  |  |         args["Instances"]["ServiceAccessSecurityGroup"] | 
					
						
							|  |  |  |     ) | 
					
						
							|  |  |  |     cl["Id"].should.equal(cluster_id) | 
					
						
							|  |  |  |     cl["LogUri"].should.equal(args["LogUri"]) | 
					
						
							|  |  |  |     cl["MasterPublicDnsName"].should.be.a(six.string_types) | 
					
						
							|  |  |  |     cl["Name"].should.equal(args["Name"]) | 
					
						
							|  |  |  |     cl["NormalizedInstanceHours"].should.equal(0) | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |     # cl['ReleaseLabel'].should.equal('emr-5.0.0') | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     cl.shouldnt.have.key("RequestedAmiVersion") | 
					
						
							|  |  |  |     cl["RunningAmiVersion"].should.equal("1.0.0") | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |     # cl['SecurityConfiguration'].should.be.a(six.string_types) | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     cl["ServiceRole"].should.equal(args["ServiceRole"]) | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     status = cl["Status"] | 
					
						
							|  |  |  |     status["State"].should.equal("TERMINATED") | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |     # cluster['Status']['StateChangeReason'] | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     status["Timeline"]["CreationDateTime"].should.be.a("datetime.datetime") | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |     # status['Timeline']['EndDateTime'].should.equal(datetime(2014, 1, 24, 2, 19, 46, tzinfo=pytz.utc)) | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     status["Timeline"]["ReadyDateTime"].should.be.a("datetime.datetime") | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     dict((t["Key"], t["Value"]) for t in cl["Tags"]).should.equal( | 
					
						
							|  |  |  |         dict((t["Key"], t["Value"]) for t in args["Tags"]) | 
					
						
							|  |  |  |     ) | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     cl["TerminationProtected"].should.equal(False) | 
					
						
							|  |  |  |     cl["VisibleToAllUsers"].should.equal(True) | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-05-10 21:58:42 -04:00
										 |  |  | @mock_emr | 
					
						
							|  |  |  | def test_describe_cluster_not_found(): | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     conn = boto3.client("emr", region_name="us-east-1") | 
					
						
							| 
									
										
										
										
											2017-05-10 21:58:42 -04:00
										 |  |  |     raised = False | 
					
						
							|  |  |  |     try: | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |         cluster = conn.describe_cluster(ClusterId="DummyId") | 
					
						
							| 
									
										
										
										
											2017-05-10 21:58:42 -04:00
										 |  |  |     except ClientError as e: | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |         if e.response["Error"]["Code"] == "ResourceNotFoundException": | 
					
						
							| 
									
										
										
										
											2017-05-10 21:58:42 -04:00
										 |  |  |             raised = True | 
					
						
							|  |  |  |     raised.should.equal(True) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | @mock_emr | 
					
						
							|  |  |  | def test_describe_job_flows(): | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     client = boto3.client("emr", region_name="us-east-1") | 
					
						
							| 
									
										
										
										
											2016-10-18 16:47:02 -07:00
										 |  |  |     args = deepcopy(run_job_flow_args) | 
					
						
							|  |  |  |     expected = {} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-02-23 19:43:48 -05:00
										 |  |  |     for idx in range(4): | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |         cluster_name = "cluster" + str(idx) | 
					
						
							|  |  |  |         args["Name"] = cluster_name | 
					
						
							|  |  |  |         cluster_id = client.run_job_flow(**args)["JobFlowId"] | 
					
						
							| 
									
										
										
										
											2016-10-18 16:47:02 -07:00
										 |  |  |         expected[cluster_id] = { | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |             "Id": cluster_id, | 
					
						
							|  |  |  |             "Name": cluster_name, | 
					
						
							|  |  |  |             "State": "WAITING", | 
					
						
							| 
									
										
										
										
											2016-10-18 16:47:02 -07:00
										 |  |  |         } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # need sleep since it appears the timestamp is always rounded to | 
					
						
							|  |  |  |     # the nearest second internally | 
					
						
							|  |  |  |     time.sleep(1) | 
					
						
							|  |  |  |     timestamp = datetime.now(pytz.utc) | 
					
						
							|  |  |  |     time.sleep(1) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-02-23 19:43:48 -05:00
										 |  |  |     for idx in range(4, 6): | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |         cluster_name = "cluster" + str(idx) | 
					
						
							|  |  |  |         args["Name"] = cluster_name | 
					
						
							|  |  |  |         cluster_id = client.run_job_flow(**args)["JobFlowId"] | 
					
						
							| 
									
										
										
										
											2016-10-18 16:47:02 -07:00
										 |  |  |         client.terminate_job_flows(JobFlowIds=[cluster_id]) | 
					
						
							|  |  |  |         expected[cluster_id] = { | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |             "Id": cluster_id, | 
					
						
							|  |  |  |             "Name": cluster_name, | 
					
						
							|  |  |  |             "State": "TERMINATED", | 
					
						
							| 
									
										
										
										
											2016-10-18 16:47:02 -07:00
										 |  |  |         } | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |     resp = client.describe_job_flows() | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     resp["JobFlows"].should.have.length_of(6) | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-10-18 16:47:02 -07:00
										 |  |  |     for cluster_id, y in expected.items(): | 
					
						
							|  |  |  |         resp = client.describe_job_flows(JobFlowIds=[cluster_id]) | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |         resp["JobFlows"].should.have.length_of(1) | 
					
						
							|  |  |  |         resp["JobFlows"][0]["JobFlowId"].should.equal(cluster_id) | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     resp = client.describe_job_flows(JobFlowStates=["WAITING"]) | 
					
						
							|  |  |  |     resp["JobFlows"].should.have.length_of(4) | 
					
						
							|  |  |  |     for x in resp["JobFlows"]: | 
					
						
							|  |  |  |         x["ExecutionStatusDetail"]["State"].should.equal("WAITING") | 
					
						
							| 
									
										
										
										
											2016-10-18 16:47:02 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |     resp = client.describe_job_flows(CreatedBefore=timestamp) | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     resp["JobFlows"].should.have.length_of(4) | 
					
						
							| 
									
										
										
										
											2016-10-18 16:47:02 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |     resp = client.describe_job_flows(CreatedAfter=timestamp) | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     resp["JobFlows"].should.have.length_of(2) | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | @mock_emr | 
					
						
							|  |  |  | def test_describe_job_flow(): | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     client = boto3.client("emr", region_name="us-east-1") | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |     args = deepcopy(run_job_flow_args) | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     args["AmiVersion"] = "3.8.1" | 
					
						
							|  |  |  |     args["Instances"].update( | 
					
						
							|  |  |  |         { | 
					
						
							|  |  |  |             "Ec2KeyName": "ec2keyname", | 
					
						
							|  |  |  |             "Ec2SubnetId": "subnet-8be41cec", | 
					
						
							|  |  |  |             "HadoopVersion": "2.4.0", | 
					
						
							|  |  |  |         } | 
					
						
							|  |  |  |     ) | 
					
						
							|  |  |  |     args["VisibleToAllUsers"] = True | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     cluster_id = client.run_job_flow(**args)["JobFlowId"] | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     jf = client.describe_job_flows(JobFlowIds=[cluster_id])["JobFlows"][0] | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     jf["AmiVersion"].should.equal(args["AmiVersion"]) | 
					
						
							|  |  |  |     jf.shouldnt.have.key("BootstrapActions") | 
					
						
							|  |  |  |     esd = jf["ExecutionStatusDetail"] | 
					
						
							|  |  |  |     esd["CreationDateTime"].should.be.a("datetime.datetime") | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |     # esd['EndDateTime'].should.be.a('datetime.datetime') | 
					
						
							|  |  |  |     # esd['LastStateChangeReason'].should.be.a(six.string_types) | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     esd["ReadyDateTime"].should.be.a("datetime.datetime") | 
					
						
							|  |  |  |     esd["StartDateTime"].should.be.a("datetime.datetime") | 
					
						
							|  |  |  |     esd["State"].should.equal("WAITING") | 
					
						
							|  |  |  |     attrs = jf["Instances"] | 
					
						
							|  |  |  |     attrs["Ec2KeyName"].should.equal(args["Instances"]["Ec2KeyName"]) | 
					
						
							|  |  |  |     attrs["Ec2SubnetId"].should.equal(args["Instances"]["Ec2SubnetId"]) | 
					
						
							|  |  |  |     attrs["HadoopVersion"].should.equal(args["Instances"]["HadoopVersion"]) | 
					
						
							|  |  |  |     attrs["InstanceCount"].should.equal(args["Instances"]["InstanceCount"]) | 
					
						
							|  |  |  |     for ig in attrs["InstanceGroups"]: | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |         # ig['BidPrice'] | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |         ig["CreationDateTime"].should.be.a("datetime.datetime") | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |         # ig['EndDateTime'].should.be.a('datetime.datetime') | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |         ig["InstanceGroupId"].should.be.a(six.string_types) | 
					
						
							|  |  |  |         ig["InstanceRequestCount"].should.be.a(int) | 
					
						
							|  |  |  |         ig["InstanceRole"].should.be.within(["MASTER", "CORE"]) | 
					
						
							|  |  |  |         ig["InstanceRunningCount"].should.be.a(int) | 
					
						
							|  |  |  |         ig["InstanceType"].should.be.within(["c3.medium", "c3.xlarge"]) | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |         # ig['LastStateChangeReason'].should.be.a(six.string_types) | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |         ig["Market"].should.equal("ON_DEMAND") | 
					
						
							|  |  |  |         ig["Name"].should.be.a(six.string_types) | 
					
						
							|  |  |  |         ig["ReadyDateTime"].should.be.a("datetime.datetime") | 
					
						
							|  |  |  |         ig["StartDateTime"].should.be.a("datetime.datetime") | 
					
						
							|  |  |  |         ig["State"].should.equal("RUNNING") | 
					
						
							|  |  |  |     attrs["KeepJobFlowAliveWhenNoSteps"].should.equal(True) | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |     # attrs['MasterInstanceId'].should.be.a(six.string_types) | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     attrs["MasterInstanceType"].should.equal(args["Instances"]["MasterInstanceType"]) | 
					
						
							|  |  |  |     attrs["MasterPublicDnsName"].should.be.a(six.string_types) | 
					
						
							|  |  |  |     attrs["NormalizedInstanceHours"].should.equal(0) | 
					
						
							|  |  |  |     attrs["Placement"]["AvailabilityZone"].should.equal( | 
					
						
							|  |  |  |         args["Instances"]["Placement"]["AvailabilityZone"] | 
					
						
							|  |  |  |     ) | 
					
						
							|  |  |  |     attrs["SlaveInstanceType"].should.equal(args["Instances"]["SlaveInstanceType"]) | 
					
						
							|  |  |  |     attrs["TerminationProtected"].should.equal(False) | 
					
						
							|  |  |  |     jf["JobFlowId"].should.equal(cluster_id) | 
					
						
							|  |  |  |     jf["JobFlowRole"].should.equal(args["JobFlowRole"]) | 
					
						
							|  |  |  |     jf["LogUri"].should.equal(args["LogUri"]) | 
					
						
							|  |  |  |     jf["Name"].should.equal(args["Name"]) | 
					
						
							|  |  |  |     jf["ServiceRole"].should.equal(args["ServiceRole"]) | 
					
						
							|  |  |  |     jf["Steps"].should.equal([]) | 
					
						
							|  |  |  |     jf["SupportedProducts"].should.equal([]) | 
					
						
							|  |  |  |     jf["VisibleToAllUsers"].should.equal(True) | 
					
						
							| 
									
										
										
										
											2015-11-15 17:24:36 +09:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | @mock_emr | 
					
						
							|  |  |  | def test_list_clusters(): | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     client = boto3.client("emr", region_name="us-east-1") | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |     args = deepcopy(run_job_flow_args) | 
					
						
							| 
									
										
										
										
											2016-10-18 16:47:02 -07:00
										 |  |  |     expected = {} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     for idx in range(40): | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |         cluster_name = "jobflow" + str(idx) | 
					
						
							|  |  |  |         args["Name"] = cluster_name | 
					
						
							|  |  |  |         cluster_id = client.run_job_flow(**args)["JobFlowId"] | 
					
						
							| 
									
										
										
										
											2016-10-18 16:47:02 -07:00
										 |  |  |         expected[cluster_id] = { | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |             "Id": cluster_id, | 
					
						
							|  |  |  |             "Name": cluster_name, | 
					
						
							|  |  |  |             "NormalizedInstanceHours": 0, | 
					
						
							|  |  |  |             "State": "WAITING", | 
					
						
							| 
									
										
										
										
											2016-10-18 16:47:02 -07:00
										 |  |  |         } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # need sleep since it appears the timestamp is always rounded to | 
					
						
							|  |  |  |     # the nearest second internally | 
					
						
							|  |  |  |     time.sleep(1) | 
					
						
							|  |  |  |     timestamp = datetime.now(pytz.utc) | 
					
						
							|  |  |  |     time.sleep(1) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     for idx in range(40, 70): | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |         cluster_name = "jobflow" + str(idx) | 
					
						
							|  |  |  |         args["Name"] = cluster_name | 
					
						
							|  |  |  |         cluster_id = client.run_job_flow(**args)["JobFlowId"] | 
					
						
							| 
									
										
										
										
											2016-10-18 16:47:02 -07:00
										 |  |  |         client.terminate_job_flows(JobFlowIds=[cluster_id]) | 
					
						
							|  |  |  |         expected[cluster_id] = { | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |             "Id": cluster_id, | 
					
						
							|  |  |  |             "Name": cluster_name, | 
					
						
							|  |  |  |             "NormalizedInstanceHours": 0, | 
					
						
							|  |  |  |             "State": "TERMINATED", | 
					
						
							| 
									
										
										
										
											2016-10-18 16:47:02 -07:00
										 |  |  |         } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     args = {} | 
					
						
							|  |  |  |     while 1: | 
					
						
							|  |  |  |         resp = client.list_clusters(**args) | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |         clusters = resp["Clusters"] | 
					
						
							| 
									
										
										
										
											2016-10-18 16:47:02 -07:00
										 |  |  |         len(clusters).should.be.lower_than_or_equal_to(50) | 
					
						
							|  |  |  |         for x in clusters: | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |             y = expected[x["Id"]] | 
					
						
							|  |  |  |             x["Id"].should.equal(y["Id"]) | 
					
						
							|  |  |  |             x["Name"].should.equal(y["Name"]) | 
					
						
							|  |  |  |             x["NormalizedInstanceHours"].should.equal(y["NormalizedInstanceHours"]) | 
					
						
							|  |  |  |             x["Status"]["State"].should.equal(y["State"]) | 
					
						
							|  |  |  |             x["Status"]["Timeline"]["CreationDateTime"].should.be.a("datetime.datetime") | 
					
						
							|  |  |  |             if y["State"] == "TERMINATED": | 
					
						
							|  |  |  |                 x["Status"]["Timeline"]["EndDateTime"].should.be.a("datetime.datetime") | 
					
						
							| 
									
										
										
										
											2016-10-18 16:47:02 -07:00
										 |  |  |             else: | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |                 x["Status"]["Timeline"].shouldnt.have.key("EndDateTime") | 
					
						
							|  |  |  |             x["Status"]["Timeline"]["ReadyDateTime"].should.be.a("datetime.datetime") | 
					
						
							|  |  |  |         marker = resp.get("Marker") | 
					
						
							| 
									
										
										
										
											2016-10-18 16:47:02 -07:00
										 |  |  |         if marker is None: | 
					
						
							|  |  |  |             break | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |         args = {"Marker": marker} | 
					
						
							| 
									
										
										
										
											2016-10-18 16:47:02 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     resp = client.list_clusters(ClusterStates=["TERMINATED"]) | 
					
						
							|  |  |  |     resp["Clusters"].should.have.length_of(30) | 
					
						
							|  |  |  |     for x in resp["Clusters"]: | 
					
						
							|  |  |  |         x["Status"]["State"].should.equal("TERMINATED") | 
					
						
							| 
									
										
										
										
											2016-10-18 16:47:02 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |     resp = client.list_clusters(CreatedBefore=timestamp) | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     resp["Clusters"].should.have.length_of(40) | 
					
						
							| 
									
										
										
										
											2016-10-18 16:47:02 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |     resp = client.list_clusters(CreatedAfter=timestamp) | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     resp["Clusters"].should.have.length_of(30) | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | @mock_emr | 
					
						
							|  |  |  | def test_run_job_flow(): | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     client = boto3.client("emr", region_name="us-east-1") | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |     args = deepcopy(run_job_flow_args) | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     cluster_id = client.run_job_flow(**args)["JobFlowId"] | 
					
						
							|  |  |  |     resp = client.describe_job_flows(JobFlowIds=[cluster_id])["JobFlows"][0] | 
					
						
							|  |  |  |     resp["ExecutionStatusDetail"]["State"].should.equal("WAITING") | 
					
						
							|  |  |  |     resp["JobFlowId"].should.equal(cluster_id) | 
					
						
							|  |  |  |     resp["Name"].should.equal(args["Name"]) | 
					
						
							|  |  |  |     resp["Instances"]["MasterInstanceType"].should.equal( | 
					
						
							|  |  |  |         args["Instances"]["MasterInstanceType"] | 
					
						
							|  |  |  |     ) | 
					
						
							|  |  |  |     resp["Instances"]["SlaveInstanceType"].should.equal( | 
					
						
							|  |  |  |         args["Instances"]["SlaveInstanceType"] | 
					
						
							|  |  |  |     ) | 
					
						
							|  |  |  |     resp["LogUri"].should.equal(args["LogUri"]) | 
					
						
							|  |  |  |     resp["VisibleToAllUsers"].should.equal(args["VisibleToAllUsers"]) | 
					
						
							|  |  |  |     resp["Instances"]["NormalizedInstanceHours"].should.equal(0) | 
					
						
							|  |  |  |     resp["Steps"].should.equal([]) | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | @mock_emr | 
					
						
							|  |  |  | def test_run_job_flow_with_invalid_params(): | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     client = boto3.client("emr", region_name="us-east-1") | 
					
						
							| 
									
										
										
										
											2017-02-23 19:43:48 -05:00
										 |  |  |     with assert_raises(ClientError) as ex: | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |         # cannot set both AmiVersion and ReleaseLabel | 
					
						
							|  |  |  |         args = deepcopy(run_job_flow_args) | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |         args["AmiVersion"] = "2.4" | 
					
						
							|  |  |  |         args["ReleaseLabel"] = "emr-5.0.0" | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |         client.run_job_flow(**args) | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     ex.exception.response["Error"]["Code"].should.equal("ValidationException") | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | @mock_emr | 
					
						
							|  |  |  | def test_run_job_flow_in_multiple_regions(): | 
					
						
							|  |  |  |     regions = {} | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     for region in ["us-east-1", "eu-west-1"]: | 
					
						
							|  |  |  |         client = boto3.client("emr", region_name=region) | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |         args = deepcopy(run_job_flow_args) | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |         args["Name"] = region | 
					
						
							|  |  |  |         cluster_id = client.run_job_flow(**args)["JobFlowId"] | 
					
						
							|  |  |  |         regions[region] = {"client": client, "cluster_id": cluster_id} | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |     for region in regions.keys(): | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |         client = regions[region]["client"] | 
					
						
							|  |  |  |         resp = client.describe_cluster(ClusterId=regions[region]["cluster_id"]) | 
					
						
							|  |  |  |         resp["Cluster"]["Name"].should.equal(region) | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | @mock_emr | 
					
						
							|  |  |  | def test_run_job_flow_with_new_params(): | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     client = boto3.client("emr", region_name="us-east-1") | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |     resp = client.run_job_flow(**run_job_flow_args) | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     resp.should.have.key("JobFlowId") | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | @mock_emr | 
					
						
							|  |  |  | def test_run_job_flow_with_visible_to_all_users(): | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     client = boto3.client("emr", region_name="us-east-1") | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |     for expected in (True, False): | 
					
						
							|  |  |  |         args = deepcopy(run_job_flow_args) | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |         args["VisibleToAllUsers"] = expected | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |         resp = client.run_job_flow(**args) | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |         cluster_id = resp["JobFlowId"] | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |         resp = client.describe_cluster(ClusterId=cluster_id) | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |         resp["Cluster"]["VisibleToAllUsers"].should.equal(expected) | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | @mock_emr | 
					
						
							|  |  |  | def test_run_job_flow_with_instance_groups(): | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     input_groups = dict((g["Name"], g) for g in input_instance_groups) | 
					
						
							|  |  |  |     client = boto3.client("emr", region_name="us-east-1") | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |     args = deepcopy(run_job_flow_args) | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     args["Instances"] = {"InstanceGroups": input_instance_groups} | 
					
						
							|  |  |  |     cluster_id = client.run_job_flow(**args)["JobFlowId"] | 
					
						
							|  |  |  |     groups = client.list_instance_groups(ClusterId=cluster_id)["InstanceGroups"] | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |     for x in groups: | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |         y = input_groups[x["Name"]] | 
					
						
							|  |  |  |         x.should.have.key("Id") | 
					
						
							|  |  |  |         x["RequestedInstanceCount"].should.equal(y["InstanceCount"]) | 
					
						
							|  |  |  |         x["InstanceGroupType"].should.equal(y["InstanceRole"]) | 
					
						
							|  |  |  |         x["InstanceType"].should.equal(y["InstanceType"]) | 
					
						
							|  |  |  |         x["Market"].should.equal(y["Market"]) | 
					
						
							|  |  |  |         if "BidPrice" in y: | 
					
						
							|  |  |  |             x["BidPrice"].should.equal(y["BidPrice"]) | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-05-25 05:19:26 -05:00
										 |  |  | @mock_emr | 
					
						
							|  |  |  | def test_run_job_flow_with_custom_ami(): | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     client = boto3.client("emr", region_name="us-east-1") | 
					
						
							| 
									
										
										
										
											2019-05-25 05:19:26 -05:00
										 |  |  | 
 | 
					
						
							|  |  |  |     with assert_raises(ClientError) as ex: | 
					
						
							|  |  |  |         # CustomAmiId available in Amazon EMR 5.7.0 and later | 
					
						
							|  |  |  |         args = deepcopy(run_job_flow_args) | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |         args["CustomAmiId"] = "MyEmrCustomId" | 
					
						
							|  |  |  |         args["ReleaseLabel"] = "emr-5.6.0" | 
					
						
							| 
									
										
										
										
											2019-05-25 05:19:26 -05:00
										 |  |  |         client.run_job_flow(**args) | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     ex.exception.response["Error"]["Code"].should.equal("ValidationException") | 
					
						
							|  |  |  |     ex.exception.response["Error"]["Message"].should.equal("Custom AMI is not allowed") | 
					
						
							| 
									
										
										
										
											2019-05-25 05:19:26 -05:00
										 |  |  | 
 | 
					
						
							|  |  |  |     with assert_raises(ClientError) as ex: | 
					
						
							|  |  |  |         args = deepcopy(run_job_flow_args) | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |         args["CustomAmiId"] = "MyEmrCustomId" | 
					
						
							|  |  |  |         args["AmiVersion"] = "3.8.1" | 
					
						
							| 
									
										
										
										
											2019-05-25 05:19:26 -05:00
										 |  |  |         client.run_job_flow(**args) | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     ex.exception.response["Error"]["Code"].should.equal("ValidationException") | 
					
						
							|  |  |  |     ex.exception.response["Error"]["Message"].should.equal( | 
					
						
							|  |  |  |         "Custom AMI is not supported in this version of EMR" | 
					
						
							|  |  |  |     ) | 
					
						
							| 
									
										
										
										
											2019-05-25 05:19:26 -05:00
										 |  |  | 
 | 
					
						
							|  |  |  |     with assert_raises(ClientError) as ex: | 
					
						
							|  |  |  |         # AMI version and release label exception  raises before CustomAmi exception | 
					
						
							|  |  |  |         args = deepcopy(run_job_flow_args) | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |         args["CustomAmiId"] = "MyEmrCustomId" | 
					
						
							|  |  |  |         args["ReleaseLabel"] = "emr-5.6.0" | 
					
						
							|  |  |  |         args["AmiVersion"] = "3.8.1" | 
					
						
							| 
									
										
										
										
											2019-05-25 05:19:26 -05:00
										 |  |  |         client.run_job_flow(**args) | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     ex.exception.response["Error"]["Code"].should.equal("ValidationException") | 
					
						
							|  |  |  |     ex.exception.response["Error"]["Message"].should.contain( | 
					
						
							|  |  |  |         "Only one AMI version and release label may be specified." | 
					
						
							|  |  |  |     ) | 
					
						
							| 
									
										
										
										
											2019-05-25 05:19:26 -05:00
										 |  |  | 
 | 
					
						
							|  |  |  |     args = deepcopy(run_job_flow_args) | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     args["CustomAmiId"] = "MyEmrCustomAmi" | 
					
						
							|  |  |  |     args["ReleaseLabel"] = "emr-5.7.0" | 
					
						
							|  |  |  |     cluster_id = client.run_job_flow(**args)["JobFlowId"] | 
					
						
							| 
									
										
										
										
											2019-05-25 05:19:26 -05:00
										 |  |  |     resp = client.describe_cluster(ClusterId=cluster_id) | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     resp["Cluster"]["CustomAmiId"].should.equal("MyEmrCustomAmi") | 
					
						
							| 
									
										
										
										
											2019-05-25 05:19:26 -05:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | @mock_emr | 
					
						
							|  |  |  | def test_set_termination_protection(): | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     client = boto3.client("emr", region_name="us-east-1") | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |     args = deepcopy(run_job_flow_args) | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     args["Instances"]["TerminationProtected"] = False | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |     resp = client.run_job_flow(**args) | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     cluster_id = resp["JobFlowId"] | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |     resp = client.describe_cluster(ClusterId=cluster_id) | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     resp["Cluster"]["TerminationProtected"].should.equal(False) | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |     for expected in (True, False): | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |         resp = client.set_termination_protection( | 
					
						
							|  |  |  |             JobFlowIds=[cluster_id], TerminationProtected=expected | 
					
						
							|  |  |  |         ) | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |         resp = client.describe_cluster(ClusterId=cluster_id) | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |         resp["Cluster"]["TerminationProtected"].should.equal(expected) | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | @mock_emr | 
					
						
							|  |  |  | def test_set_visible_to_all_users(): | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     client = boto3.client("emr", region_name="us-east-1") | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |     args = deepcopy(run_job_flow_args) | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     args["VisibleToAllUsers"] = False | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |     resp = client.run_job_flow(**args) | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     cluster_id = resp["JobFlowId"] | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |     resp = client.describe_cluster(ClusterId=cluster_id) | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     resp["Cluster"]["VisibleToAllUsers"].should.equal(False) | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |     for expected in (True, False): | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |         resp = client.set_visible_to_all_users( | 
					
						
							|  |  |  |             JobFlowIds=[cluster_id], VisibleToAllUsers=expected | 
					
						
							|  |  |  |         ) | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |         resp = client.describe_cluster(ClusterId=cluster_id) | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |         resp["Cluster"]["VisibleToAllUsers"].should.equal(expected) | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | @mock_emr | 
					
						
							|  |  |  | def test_terminate_job_flows(): | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     client = boto3.client("emr", region_name="us-east-1") | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |     resp = client.run_job_flow(**run_job_flow_args) | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     cluster_id = resp["JobFlowId"] | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |     resp = client.describe_cluster(ClusterId=cluster_id) | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     resp["Cluster"]["Status"]["State"].should.equal("WAITING") | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |     resp = client.terminate_job_flows(JobFlowIds=[cluster_id]) | 
					
						
							|  |  |  |     resp = client.describe_cluster(ClusterId=cluster_id) | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     resp["Cluster"]["Status"]["State"].should.equal("TERMINATED") | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | # testing multiple end points for each feature | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | @mock_emr | 
					
						
							|  |  |  | def test_bootstrap_actions(): | 
					
						
							|  |  |  |     bootstrap_actions = [ | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |         { | 
					
						
							|  |  |  |             "Name": "bs1", | 
					
						
							|  |  |  |             "ScriptBootstrapAction": { | 
					
						
							|  |  |  |                 "Args": ["arg1", "arg2"], | 
					
						
							|  |  |  |                 "Path": "s3://path/to/script", | 
					
						
							|  |  |  |             }, | 
					
						
							|  |  |  |         }, | 
					
						
							|  |  |  |         { | 
					
						
							|  |  |  |             "Name": "bs2", | 
					
						
							|  |  |  |             "ScriptBootstrapAction": {"Args": [], "Path": "s3://path/to/anotherscript"}, | 
					
						
							|  |  |  |         }, | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |     ] | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     client = boto3.client("emr", region_name="us-east-1") | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |     args = deepcopy(run_job_flow_args) | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     args["BootstrapActions"] = bootstrap_actions | 
					
						
							|  |  |  |     cluster_id = client.run_job_flow(**args)["JobFlowId"] | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     cl = client.describe_job_flows(JobFlowIds=[cluster_id])["JobFlows"][0] | 
					
						
							|  |  |  |     for x, y in zip(cl["BootstrapActions"], bootstrap_actions): | 
					
						
							|  |  |  |         x["BootstrapActionConfig"].should.equal(y) | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |     resp = client.list_bootstrap_actions(ClusterId=cluster_id) | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     for x, y in zip(resp["BootstrapActions"], bootstrap_actions): | 
					
						
							|  |  |  |         x["Name"].should.equal(y["Name"]) | 
					
						
							|  |  |  |         if "Args" in y["ScriptBootstrapAction"]: | 
					
						
							|  |  |  |             x["Args"].should.equal(y["ScriptBootstrapAction"]["Args"]) | 
					
						
							|  |  |  |         x["ScriptPath"].should.equal(y["ScriptBootstrapAction"]["Path"]) | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | @mock_emr | 
					
						
							|  |  |  | def test_instance_groups(): | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     input_groups = dict((g["Name"], g) for g in input_instance_groups) | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     client = boto3.client("emr", region_name="us-east-1") | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |     args = deepcopy(run_job_flow_args) | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     for key in ["MasterInstanceType", "SlaveInstanceType", "InstanceCount"]: | 
					
						
							|  |  |  |         del args["Instances"][key] | 
					
						
							|  |  |  |     args["Instances"]["InstanceGroups"] = input_instance_groups[:2] | 
					
						
							|  |  |  |     cluster_id = client.run_job_flow(**args)["JobFlowId"] | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     jf = client.describe_job_flows(JobFlowIds=[cluster_id])["JobFlows"][0] | 
					
						
							|  |  |  |     base_instance_count = jf["Instances"]["InstanceCount"] | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-02-23 21:37:43 -05:00
										 |  |  |     client.add_instance_groups( | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |         JobFlowId=cluster_id, InstanceGroups=input_instance_groups[2:] | 
					
						
							|  |  |  |     ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     jf = client.describe_job_flows(JobFlowIds=[cluster_id])["JobFlows"][0] | 
					
						
							|  |  |  |     jf["Instances"]["InstanceCount"].should.equal( | 
					
						
							|  |  |  |         sum(g["InstanceCount"] for g in input_instance_groups) | 
					
						
							|  |  |  |     ) | 
					
						
							|  |  |  |     for x in jf["Instances"]["InstanceGroups"]: | 
					
						
							|  |  |  |         y = input_groups[x["Name"]] | 
					
						
							|  |  |  |         if hasattr(y, "BidPrice"): | 
					
						
							|  |  |  |             x["BidPrice"].should.equal("BidPrice") | 
					
						
							|  |  |  |         x["CreationDateTime"].should.be.a("datetime.datetime") | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |         # x['EndDateTime'].should.be.a('datetime.datetime') | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |         x.should.have.key("InstanceGroupId") | 
					
						
							|  |  |  |         x["InstanceRequestCount"].should.equal(y["InstanceCount"]) | 
					
						
							|  |  |  |         x["InstanceRole"].should.equal(y["InstanceRole"]) | 
					
						
							|  |  |  |         x["InstanceRunningCount"].should.equal(y["InstanceCount"]) | 
					
						
							|  |  |  |         x["InstanceType"].should.equal(y["InstanceType"]) | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |         # x['LastStateChangeReason'].should.equal(y['LastStateChangeReason']) | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |         x["Market"].should.equal(y["Market"]) | 
					
						
							|  |  |  |         x["Name"].should.equal(y["Name"]) | 
					
						
							|  |  |  |         x["ReadyDateTime"].should.be.a("datetime.datetime") | 
					
						
							|  |  |  |         x["StartDateTime"].should.be.a("datetime.datetime") | 
					
						
							|  |  |  |         x["State"].should.equal("RUNNING") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     groups = client.list_instance_groups(ClusterId=cluster_id)["InstanceGroups"] | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |     for x in groups: | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |         y = input_groups[x["Name"]] | 
					
						
							|  |  |  |         if hasattr(y, "BidPrice"): | 
					
						
							|  |  |  |             x["BidPrice"].should.equal("BidPrice") | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |         # Configurations | 
					
						
							|  |  |  |         # EbsBlockDevices | 
					
						
							|  |  |  |         # EbsOptimized | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |         x.should.have.key("Id") | 
					
						
							|  |  |  |         x["InstanceGroupType"].should.equal(y["InstanceRole"]) | 
					
						
							|  |  |  |         x["InstanceType"].should.equal(y["InstanceType"]) | 
					
						
							|  |  |  |         x["Market"].should.equal(y["Market"]) | 
					
						
							|  |  |  |         x["Name"].should.equal(y["Name"]) | 
					
						
							|  |  |  |         x["RequestedInstanceCount"].should.equal(y["InstanceCount"]) | 
					
						
							|  |  |  |         x["RunningInstanceCount"].should.equal(y["InstanceCount"]) | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |         # ShrinkPolicy | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |         x["Status"]["State"].should.equal("RUNNING") | 
					
						
							|  |  |  |         x["Status"]["StateChangeReason"]["Code"].should.be.a(six.string_types) | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |         # x['Status']['StateChangeReason']['Message'].should.be.a(six.string_types) | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |         x["Status"]["Timeline"]["CreationDateTime"].should.be.a("datetime.datetime") | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |         # x['Status']['Timeline']['EndDateTime'].should.be.a('datetime.datetime') | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |         x["Status"]["Timeline"]["ReadyDateTime"].should.be.a("datetime.datetime") | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     igs = dict((g["Name"], g) for g in groups) | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |     client.modify_instance_groups( | 
					
						
							|  |  |  |         InstanceGroups=[ | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |             {"InstanceGroupId": igs["task-1"]["Id"], "InstanceCount": 2}, | 
					
						
							|  |  |  |             {"InstanceGroupId": igs["task-2"]["Id"], "InstanceCount": 3}, | 
					
						
							|  |  |  |         ] | 
					
						
							|  |  |  |     ) | 
					
						
							|  |  |  |     jf = client.describe_job_flows(JobFlowIds=[cluster_id])["JobFlows"][0] | 
					
						
							|  |  |  |     jf["Instances"]["InstanceCount"].should.equal(base_instance_count + 5) | 
					
						
							|  |  |  |     igs = dict((g["Name"], g) for g in jf["Instances"]["InstanceGroups"]) | 
					
						
							|  |  |  |     igs["task-1"]["InstanceRunningCount"].should.equal(2) | 
					
						
							|  |  |  |     igs["task-2"]["InstanceRunningCount"].should.equal(3) | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | @mock_emr | 
					
						
							|  |  |  | def test_steps(): | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     input_steps = [ | 
					
						
							|  |  |  |         { | 
					
						
							|  |  |  |             "HadoopJarStep": { | 
					
						
							|  |  |  |                 "Args": [ | 
					
						
							|  |  |  |                     "hadoop-streaming", | 
					
						
							|  |  |  |                     "-files", | 
					
						
							|  |  |  |                     "s3://elasticmapreduce/samples/wordcount/wordSplitter.py#wordSplitter.py", | 
					
						
							|  |  |  |                     "-mapper", | 
					
						
							|  |  |  |                     "python wordSplitter.py", | 
					
						
							|  |  |  |                     "-input", | 
					
						
							|  |  |  |                     "s3://elasticmapreduce/samples/wordcount/input", | 
					
						
							|  |  |  |                     "-output", | 
					
						
							|  |  |  |                     "s3://output_bucket/output/wordcount_output", | 
					
						
							|  |  |  |                     "-reducer", | 
					
						
							|  |  |  |                     "aggregate", | 
					
						
							|  |  |  |                 ], | 
					
						
							|  |  |  |                 "Jar": "command-runner.jar", | 
					
						
							|  |  |  |             }, | 
					
						
							|  |  |  |             "Name": "My wordcount example", | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |         }, | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |         { | 
					
						
							|  |  |  |             "HadoopJarStep": { | 
					
						
							|  |  |  |                 "Args": [ | 
					
						
							|  |  |  |                     "hadoop-streaming", | 
					
						
							|  |  |  |                     "-files", | 
					
						
							|  |  |  |                     "s3://elasticmapreduce/samples/wordcount/wordSplitter2.py#wordSplitter2.py", | 
					
						
							|  |  |  |                     "-mapper", | 
					
						
							|  |  |  |                     "python wordSplitter2.py", | 
					
						
							|  |  |  |                     "-input", | 
					
						
							|  |  |  |                     "s3://elasticmapreduce/samples/wordcount/input2", | 
					
						
							|  |  |  |                     "-output", | 
					
						
							|  |  |  |                     "s3://output_bucket/output/wordcount_output2", | 
					
						
							|  |  |  |                     "-reducer", | 
					
						
							|  |  |  |                     "aggregate", | 
					
						
							|  |  |  |                 ], | 
					
						
							|  |  |  |                 "Jar": "command-runner.jar", | 
					
						
							|  |  |  |             }, | 
					
						
							|  |  |  |             "Name": "My wordcount example2", | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |         }, | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     ] | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |     # TODO: implementation and test for cancel_steps | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     client = boto3.client("emr", region_name="us-east-1") | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |     args = deepcopy(run_job_flow_args) | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     args["Steps"] = [input_steps[0]] | 
					
						
							|  |  |  |     cluster_id = client.run_job_flow(**args)["JobFlowId"] | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     jf = client.describe_job_flows(JobFlowIds=[cluster_id])["JobFlows"][0] | 
					
						
							|  |  |  |     jf["Steps"].should.have.length_of(1) | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |     client.add_job_flow_steps(JobFlowId=cluster_id, Steps=[input_steps[1]]) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     jf = client.describe_job_flows(JobFlowIds=[cluster_id])["JobFlows"][0] | 
					
						
							|  |  |  |     jf["Steps"].should.have.length_of(2) | 
					
						
							|  |  |  |     for idx, (x, y) in enumerate(zip(jf["Steps"], input_steps)): | 
					
						
							|  |  |  |         x["ExecutionStatusDetail"].should.have.key("CreationDateTime") | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |         # x['ExecutionStatusDetail'].should.have.key('EndDateTime') | 
					
						
							|  |  |  |         # x['ExecutionStatusDetail'].should.have.key('LastStateChangeReason') | 
					
						
							|  |  |  |         # x['ExecutionStatusDetail'].should.have.key('StartDateTime') | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |         x["ExecutionStatusDetail"]["State"].should.equal( | 
					
						
							|  |  |  |             "STARTING" if idx == 0 else "PENDING" | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |         x["StepConfig"]["ActionOnFailure"].should.equal("TERMINATE_CLUSTER") | 
					
						
							|  |  |  |         x["StepConfig"]["HadoopJarStep"]["Args"].should.equal( | 
					
						
							|  |  |  |             y["HadoopJarStep"]["Args"] | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |         x["StepConfig"]["HadoopJarStep"]["Jar"].should.equal(y["HadoopJarStep"]["Jar"]) | 
					
						
							|  |  |  |         if "MainClass" in y["HadoopJarStep"]: | 
					
						
							|  |  |  |             x["StepConfig"]["HadoopJarStep"]["MainClass"].should.equal( | 
					
						
							|  |  |  |                 y["HadoopJarStep"]["MainClass"] | 
					
						
							|  |  |  |             ) | 
					
						
							|  |  |  |         if "Properties" in y["HadoopJarStep"]: | 
					
						
							|  |  |  |             x["StepConfig"]["HadoopJarStep"]["Properties"].should.equal( | 
					
						
							|  |  |  |                 y["HadoopJarStep"]["Properties"] | 
					
						
							|  |  |  |             ) | 
					
						
							|  |  |  |         x["StepConfig"]["Name"].should.equal(y["Name"]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     expected = dict((s["Name"], s) for s in input_steps) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     steps = client.list_steps(ClusterId=cluster_id)["Steps"] | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |     steps.should.have.length_of(2) | 
					
						
							|  |  |  |     for x in steps: | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |         y = expected[x["Name"]] | 
					
						
							|  |  |  |         x["ActionOnFailure"].should.equal("TERMINATE_CLUSTER") | 
					
						
							|  |  |  |         x["Config"]["Args"].should.equal(y["HadoopJarStep"]["Args"]) | 
					
						
							|  |  |  |         x["Config"]["Jar"].should.equal(y["HadoopJarStep"]["Jar"]) | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |         # x['Config']['MainClass'].should.equal(y['HadoopJarStep']['MainClass']) | 
					
						
							|  |  |  |         # Properties | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |         x["Id"].should.be.a(six.string_types) | 
					
						
							|  |  |  |         x["Name"].should.equal(y["Name"]) | 
					
						
							|  |  |  |         x["Status"]["State"].should.be.within(["STARTING", "PENDING"]) | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |         # StateChangeReason | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |         x["Status"]["Timeline"]["CreationDateTime"].should.be.a("datetime.datetime") | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |         # x['Status']['Timeline']['EndDateTime'].should.be.a('datetime.datetime') | 
					
						
							|  |  |  |         # x['Status']['Timeline']['StartDateTime'].should.be.a('datetime.datetime') | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |         x = client.describe_step(ClusterId=cluster_id, StepId=x["Id"])["Step"] | 
					
						
							|  |  |  |         x["ActionOnFailure"].should.equal("TERMINATE_CLUSTER") | 
					
						
							|  |  |  |         x["Config"]["Args"].should.equal(y["HadoopJarStep"]["Args"]) | 
					
						
							|  |  |  |         x["Config"]["Jar"].should.equal(y["HadoopJarStep"]["Jar"]) | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |         # x['Config']['MainClass'].should.equal(y['HadoopJarStep']['MainClass']) | 
					
						
							|  |  |  |         # Properties | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |         x["Id"].should.be.a(six.string_types) | 
					
						
							|  |  |  |         x["Name"].should.equal(y["Name"]) | 
					
						
							|  |  |  |         x["Status"]["State"].should.be.within(["STARTING", "PENDING"]) | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |         # StateChangeReason | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |         x["Status"]["Timeline"]["CreationDateTime"].should.be.a("datetime.datetime") | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  |         # x['Status']['Timeline']['EndDateTime'].should.be.a('datetime.datetime') | 
					
						
							|  |  |  |         # x['Status']['Timeline']['StartDateTime'].should.be.a('datetime.datetime') | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     step_id = steps[0]["Id"] | 
					
						
							|  |  |  |     steps = client.list_steps(ClusterId=cluster_id, StepIds=[step_id])["Steps"] | 
					
						
							| 
									
										
										
										
											2016-10-18 16:47:02 -07:00
										 |  |  |     steps.should.have.length_of(1) | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     steps[0]["Id"].should.equal(step_id) | 
					
						
							| 
									
										
										
										
											2016-10-18 16:47:02 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     steps = client.list_steps(ClusterId=cluster_id, StepStates=["STARTING"])["Steps"] | 
					
						
							| 
									
										
										
										
											2016-10-18 16:47:02 -07:00
										 |  |  |     steps.should.have.length_of(1) | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     steps[0]["Id"].should.equal(step_id) | 
					
						
							| 
									
										
										
										
											2016-10-18 16:47:02 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  | @mock_emr | 
					
						
							|  |  |  | def test_tags(): | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     input_tags = [ | 
					
						
							|  |  |  |         {"Key": "newkey1", "Value": "newval1"}, | 
					
						
							|  |  |  |         {"Key": "newkey2", "Value": "newval2"}, | 
					
						
							|  |  |  |     ] | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     client = boto3.client("emr", region_name="us-east-1") | 
					
						
							|  |  |  |     cluster_id = client.run_job_flow(**run_job_flow_args)["JobFlowId"] | 
					
						
							| 
									
										
										
										
											2016-09-21 20:59:19 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |     client.add_tags(ResourceId=cluster_id, Tags=input_tags) | 
					
						
							| 
									
										
										
										
											2019-10-31 08:44:26 -07:00
										 |  |  |     resp = client.describe_cluster(ClusterId=cluster_id)["Cluster"] | 
					
						
							|  |  |  |     resp["Tags"].should.have.length_of(2) | 
					
						
							|  |  |  |     dict((t["Key"], t["Value"]) for t in resp["Tags"]).should.equal( | 
					
						
							|  |  |  |         dict((t["Key"], t["Value"]) for t in input_tags) | 
					
						
							|  |  |  |     ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     client.remove_tags(ResourceId=cluster_id, TagKeys=[t["Key"] for t in input_tags]) | 
					
						
							|  |  |  |     resp = client.describe_cluster(ClusterId=cluster_id)["Cluster"] | 
					
						
							|  |  |  |     resp["Tags"].should.equal([]) |