# -*- coding: utf-8 -*- from __future__ import unicode_literals import time from copy import deepcopy from datetime import datetime import boto3 import json import pytz import sure # noqa from botocore.exceptions import ClientError import pytest from moto import mock_emr from moto.core import ACCOUNT_ID run_job_flow_args = dict( Instances={ "InstanceCount": 3, "KeepJobFlowAliveWhenNoSteps": True, "MasterInstanceType": "c3.medium", "Placement": {"AvailabilityZone": "us-east-1a"}, "SlaveInstanceType": "c3.xlarge", }, JobFlowRole="EMR_EC2_DefaultRole", LogUri="s3://mybucket/log", Name="cluster", ServiceRole="EMR_DefaultRole", VisibleToAllUsers=True, ) input_instance_groups = [ { "InstanceCount": 1, "InstanceRole": "MASTER", "InstanceType": "c1.medium", "Market": "ON_DEMAND", "Name": "master", }, { "InstanceCount": 3, "InstanceRole": "CORE", "InstanceType": "c1.medium", "Market": "ON_DEMAND", "Name": "core", }, { "InstanceCount": 6, "InstanceRole": "TASK", "InstanceType": "c1.large", "Market": "SPOT", "Name": "task-1", "BidPrice": "0.07", }, { "InstanceCount": 10, "InstanceRole": "TASK", "InstanceType": "c1.xlarge", "Market": "SPOT", "Name": "task-2", "BidPrice": "0.05", "EbsConfiguration": { "EbsBlockDeviceConfigs": [ { "VolumeSpecification": {"VolumeType": "gp2", "SizeInGB": 800}, "VolumesPerInstance": 6, }, ], "EbsOptimized": True, }, }, ] @mock_emr def test_describe_cluster(): region_name = "us-east-1" client = boto3.client("emr", region_name=region_name) args = deepcopy(run_job_flow_args) args["Applications"] = [{"Name": "Spark", "Version": "2.4.2"}] args["Configurations"] = [ { "Classification": "yarn-site", "Properties": { "someproperty": "somevalue", "someotherproperty": "someothervalue", }, }, { "Classification": "nested-configs", "Properties": {}, "Configurations": [ { "Classification": "nested-config", "Properties": {"nested-property": "nested-value"}, } ], }, ] args["Instances"]["AdditionalMasterSecurityGroups"] = ["additional-master"] args["Instances"]["AdditionalSlaveSecurityGroups"] = ["additional-slave"] args["Instances"]["Ec2KeyName"] = "mykey" args["Instances"]["Ec2SubnetId"] = "subnet-8be41cec" args["Instances"]["EmrManagedMasterSecurityGroup"] = "master-security-group" args["Instances"]["EmrManagedSlaveSecurityGroup"] = "slave-security-group" args["Instances"]["KeepJobFlowAliveWhenNoSteps"] = False args["Instances"]["ServiceAccessSecurityGroup"] = "service-access-security-group" args["KerberosAttributes"] = { "Realm": "MY-REALM.COM", "KdcAdminPassword": "SuperSecretPassword2", "CrossRealmTrustPrincipalPassword": "SuperSecretPassword3", "ADDomainJoinUser": "Bob", "ADDomainJoinPassword": "SuperSecretPassword4", } args["Tags"] = [{"Key": "tag1", "Value": "val1"}, {"Key": "tag2", "Value": "val2"}] args["SecurityConfiguration"] = "my-security-configuration" cluster_id = client.run_job_flow(**args)["JobFlowId"] cl = client.describe_cluster(ClusterId=cluster_id)["Cluster"] cl["Applications"][0]["Name"].should.equal("Spark") cl["Applications"][0]["Version"].should.equal("2.4.2") cl["AutoTerminate"].should.equal(True) config = cl["Configurations"][0] config["Classification"].should.equal("yarn-site") config["Properties"].should.equal(args["Configurations"][0]["Properties"]) nested_config = cl["Configurations"][1] nested_config["Classification"].should.equal("nested-configs") nested_config["Properties"].should.equal(args["Configurations"][1]["Properties"]) attrs = cl["Ec2InstanceAttributes"] attrs["AdditionalMasterSecurityGroups"].should.equal( args["Instances"]["AdditionalMasterSecurityGroups"] ) attrs["AdditionalSlaveSecurityGroups"].should.equal( args["Instances"]["AdditionalSlaveSecurityGroups"] ) attrs["Ec2AvailabilityZone"].should.equal("us-east-1a") attrs["Ec2KeyName"].should.equal(args["Instances"]["Ec2KeyName"]) attrs["Ec2SubnetId"].should.equal(args["Instances"]["Ec2SubnetId"]) attrs["EmrManagedMasterSecurityGroup"].should.equal( args["Instances"]["EmrManagedMasterSecurityGroup"] ) attrs["EmrManagedSlaveSecurityGroup"].should.equal( args["Instances"]["EmrManagedSlaveSecurityGroup"] ) attrs["IamInstanceProfile"].should.equal(args["JobFlowRole"]) attrs["ServiceAccessSecurityGroup"].should.equal( args["Instances"]["ServiceAccessSecurityGroup"] ) cl["Id"].should.equal(cluster_id) cl["KerberosAttributes"].should.equal(args["KerberosAttributes"]) cl["LogUri"].should.equal(args["LogUri"]) cl["MasterPublicDnsName"].should.be.a(str) cl["Name"].should.equal(args["Name"]) cl["NormalizedInstanceHours"].should.equal(0) # cl['ReleaseLabel'].should.equal('emr-5.0.0') cl.shouldnt.have.key("RequestedAmiVersion") cl["RunningAmiVersion"].should.equal("1.0.0") cl["SecurityConfiguration"].should.be.a(str) cl["SecurityConfiguration"].should.equal(args["SecurityConfiguration"]) cl["ServiceRole"].should.equal(args["ServiceRole"]) status = cl["Status"] status["State"].should.equal("TERMINATED") # cluster['Status']['StateChangeReason'] status["Timeline"]["CreationDateTime"].should.be.a("datetime.datetime") # status['Timeline']['EndDateTime'].should.equal(datetime(2014, 1, 24, 2, 19, 46, tzinfo=pytz.utc)) status["Timeline"]["ReadyDateTime"].should.be.a("datetime.datetime") dict((t["Key"], t["Value"]) for t in cl["Tags"]).should.equal( dict((t["Key"], t["Value"]) for t in args["Tags"]) ) cl["TerminationProtected"].should.equal(False) cl["VisibleToAllUsers"].should.equal(True) cl["ClusterArn"].should.equal( "arn:aws:elasticmapreduce:{0}:{1}:cluster/{2}".format( region_name, ACCOUNT_ID, cluster_id ) ) @mock_emr def test_describe_cluster_not_found(): conn = boto3.client("emr", region_name="us-east-1") raised = False try: cluster = conn.describe_cluster(ClusterId="DummyId") except ClientError as e: if e.response["Error"]["Code"] == "ResourceNotFoundException": raised = True raised.should.equal(True) @mock_emr def test_describe_job_flows(): client = boto3.client("emr", region_name="us-east-1") args = deepcopy(run_job_flow_args) expected = {} for idx in range(4): cluster_name = "cluster" + str(idx) args["Name"] = cluster_name cluster_id = client.run_job_flow(**args)["JobFlowId"] expected[cluster_id] = { "Id": cluster_id, "Name": cluster_name, "State": "WAITING", } # need sleep since it appears the timestamp is always rounded to # the nearest second internally time.sleep(1) timestamp = datetime.now(pytz.utc) time.sleep(1) for idx in range(4, 6): cluster_name = "cluster" + str(idx) args["Name"] = cluster_name cluster_id = client.run_job_flow(**args)["JobFlowId"] client.terminate_job_flows(JobFlowIds=[cluster_id]) expected[cluster_id] = { "Id": cluster_id, "Name": cluster_name, "State": "TERMINATED", } resp = client.describe_job_flows() resp["JobFlows"].should.have.length_of(6) for cluster_id, y in expected.items(): resp = client.describe_job_flows(JobFlowIds=[cluster_id]) resp["JobFlows"].should.have.length_of(1) resp["JobFlows"][0]["JobFlowId"].should.equal(cluster_id) resp = client.describe_job_flows(JobFlowStates=["WAITING"]) resp["JobFlows"].should.have.length_of(4) for x in resp["JobFlows"]: x["ExecutionStatusDetail"]["State"].should.equal("WAITING") resp = client.describe_job_flows(CreatedBefore=timestamp) resp["JobFlows"].should.have.length_of(4) resp = client.describe_job_flows(CreatedAfter=timestamp) resp["JobFlows"].should.have.length_of(2) @mock_emr def test_describe_job_flow(): client = boto3.client("emr", region_name="us-east-1") args = deepcopy(run_job_flow_args) args["AmiVersion"] = "3.8.1" args["Instances"].update( { "Ec2KeyName": "ec2keyname", "Ec2SubnetId": "subnet-8be41cec", "HadoopVersion": "2.4.0", } ) args["VisibleToAllUsers"] = True cluster_id = client.run_job_flow(**args)["JobFlowId"] jf = client.describe_job_flows(JobFlowIds=[cluster_id])["JobFlows"][0] jf["AmiVersion"].should.equal(args["AmiVersion"]) jf.shouldnt.have.key("BootstrapActions") esd = jf["ExecutionStatusDetail"] esd["CreationDateTime"].should.be.a("datetime.datetime") # esd['EndDateTime'].should.be.a('datetime.datetime') # esd['LastStateChangeReason'].should.be.a(str) esd["ReadyDateTime"].should.be.a("datetime.datetime") esd["StartDateTime"].should.be.a("datetime.datetime") esd["State"].should.equal("WAITING") attrs = jf["Instances"] attrs["Ec2KeyName"].should.equal(args["Instances"]["Ec2KeyName"]) attrs["Ec2SubnetId"].should.equal(args["Instances"]["Ec2SubnetId"]) attrs["HadoopVersion"].should.equal(args["Instances"]["HadoopVersion"]) attrs["InstanceCount"].should.equal(args["Instances"]["InstanceCount"]) for ig in attrs["InstanceGroups"]: # ig['BidPrice'] ig["CreationDateTime"].should.be.a("datetime.datetime") # ig['EndDateTime'].should.be.a('datetime.datetime') ig["InstanceGroupId"].should.be.a(str) ig["InstanceRequestCount"].should.be.a(int) ig["InstanceRole"].should.be.within(["MASTER", "CORE"]) ig["InstanceRunningCount"].should.be.a(int) ig["InstanceType"].should.be.within(["c3.medium", "c3.xlarge"]) # ig['LastStateChangeReason'].should.be.a(str) ig["Market"].should.equal("ON_DEMAND") ig["Name"].should.be.a(str) ig["ReadyDateTime"].should.be.a("datetime.datetime") ig["StartDateTime"].should.be.a("datetime.datetime") ig["State"].should.equal("RUNNING") attrs["KeepJobFlowAliveWhenNoSteps"].should.equal(True) # attrs['MasterInstanceId'].should.be.a(str) attrs["MasterInstanceType"].should.equal(args["Instances"]["MasterInstanceType"]) attrs["MasterPublicDnsName"].should.be.a(str) attrs["NormalizedInstanceHours"].should.equal(0) attrs["Placement"]["AvailabilityZone"].should.equal( args["Instances"]["Placement"]["AvailabilityZone"] ) attrs["SlaveInstanceType"].should.equal(args["Instances"]["SlaveInstanceType"]) attrs["TerminationProtected"].should.equal(False) jf["JobFlowId"].should.equal(cluster_id) jf["JobFlowRole"].should.equal(args["JobFlowRole"]) jf["LogUri"].should.equal(args["LogUri"]) jf["Name"].should.equal(args["Name"]) jf["ServiceRole"].should.equal(args["ServiceRole"]) jf["Steps"].should.equal([]) jf["SupportedProducts"].should.equal([]) jf["VisibleToAllUsers"].should.equal(True) @mock_emr def test_list_clusters(): client = boto3.client("emr", region_name="us-east-1") args = deepcopy(run_job_flow_args) expected = {} for idx in range(40): cluster_name = "jobflow" + str(idx) args["Name"] = cluster_name cluster_id = client.run_job_flow(**args)["JobFlowId"] expected[cluster_id] = { "Id": cluster_id, "Name": cluster_name, "NormalizedInstanceHours": 0, "State": "WAITING", } # need sleep since it appears the timestamp is always rounded to # the nearest second internally time.sleep(1) timestamp = datetime.now(pytz.utc) time.sleep(1) for idx in range(40, 70): cluster_name = "jobflow" + str(idx) args["Name"] = cluster_name cluster_id = client.run_job_flow(**args)["JobFlowId"] client.terminate_job_flows(JobFlowIds=[cluster_id]) expected[cluster_id] = { "Id": cluster_id, "Name": cluster_name, "NormalizedInstanceHours": 0, "State": "TERMINATED", } args = {} while 1: resp = client.list_clusters(**args) clusters = resp["Clusters"] len(clusters).should.be.lower_than_or_equal_to(50) for x in clusters: y = expected[x["Id"]] x["Id"].should.equal(y["Id"]) x["Name"].should.equal(y["Name"]) x["NormalizedInstanceHours"].should.equal(y["NormalizedInstanceHours"]) x["Status"]["State"].should.equal(y["State"]) x["Status"]["Timeline"]["CreationDateTime"].should.be.a("datetime.datetime") if y["State"] == "TERMINATED": x["Status"]["Timeline"]["EndDateTime"].should.be.a("datetime.datetime") else: x["Status"]["Timeline"].shouldnt.have.key("EndDateTime") x["Status"]["Timeline"]["ReadyDateTime"].should.be.a("datetime.datetime") marker = resp.get("Marker") if marker is None: break args = {"Marker": marker} resp = client.list_clusters(ClusterStates=["TERMINATED"]) resp["Clusters"].should.have.length_of(30) for x in resp["Clusters"]: x["Status"]["State"].should.equal("TERMINATED") resp = client.list_clusters(CreatedBefore=timestamp) resp["Clusters"].should.have.length_of(40) resp = client.list_clusters(CreatedAfter=timestamp) resp["Clusters"].should.have.length_of(30) @mock_emr def test_run_job_flow(): region_name = "us-east-1" client = boto3.client("emr", region_name=region_name) args = deepcopy(run_job_flow_args) resp = client.run_job_flow(**args) resp["ClusterArn"].startswith( "arn:aws:elasticmapreduce:{0}:{1}:cluster/".format(region_name, ACCOUNT_ID) ) job_flow_id = resp["JobFlowId"] resp = client.describe_job_flows(JobFlowIds=[job_flow_id])["JobFlows"][0] resp["ExecutionStatusDetail"]["State"].should.equal("WAITING") resp["JobFlowId"].should.equal(job_flow_id) resp["Name"].should.equal(args["Name"]) resp["Instances"]["MasterInstanceType"].should.equal( args["Instances"]["MasterInstanceType"] ) resp["Instances"]["SlaveInstanceType"].should.equal( args["Instances"]["SlaveInstanceType"] ) resp["LogUri"].should.equal(args["LogUri"]) resp["VisibleToAllUsers"].should.equal(args["VisibleToAllUsers"]) resp["Instances"]["NormalizedInstanceHours"].should.equal(0) resp["Steps"].should.equal([]) @mock_emr def test_run_job_flow_with_invalid_params(): client = boto3.client("emr", region_name="us-east-1") with pytest.raises(ClientError) as ex: # cannot set both AmiVersion and ReleaseLabel args = deepcopy(run_job_flow_args) args["AmiVersion"] = "2.4" args["ReleaseLabel"] = "emr-5.0.0" client.run_job_flow(**args) ex.value.response["Error"]["Code"].should.equal("ValidationException") @mock_emr def test_run_job_flow_in_multiple_regions(): regions = {} for region in ["us-east-1", "eu-west-1"]: client = boto3.client("emr", region_name=region) args = deepcopy(run_job_flow_args) args["Name"] = region cluster_id = client.run_job_flow(**args)["JobFlowId"] regions[region] = {"client": client, "cluster_id": cluster_id} for region in regions.keys(): client = regions[region]["client"] resp = client.describe_cluster(ClusterId=regions[region]["cluster_id"]) resp["Cluster"]["Name"].should.equal(region) @mock_emr def test_run_job_flow_with_new_params(): client = boto3.client("emr", region_name="us-east-1") resp = client.run_job_flow(**run_job_flow_args) resp.should.have.key("JobFlowId") @mock_emr def test_run_job_flow_with_visible_to_all_users(): client = boto3.client("emr", region_name="us-east-1") for expected in (True, False): args = deepcopy(run_job_flow_args) args["VisibleToAllUsers"] = expected resp = client.run_job_flow(**args) cluster_id = resp["JobFlowId"] resp = client.describe_cluster(ClusterId=cluster_id) resp["Cluster"]["VisibleToAllUsers"].should.equal(expected) def _do_assertion_ebs_configuration(x, y): total_volumes = 0 total_size = 0 for ebs_block in y["EbsConfiguration"]["EbsBlockDeviceConfigs"]: total_volumes += ebs_block["VolumesPerInstance"] total_size += ebs_block["VolumeSpecification"]["SizeInGB"] # Multiply by total volumes total_size = total_size * total_volumes comp_total_size = 0 for ebs_block in x["EbsBlockDevices"]: comp_total_size += ebs_block["VolumeSpecification"]["SizeInGB"] len(x["EbsBlockDevices"]).should.equal(total_volumes) comp_total_size.should.equal(comp_total_size) @mock_emr def test_run_job_flow_with_instance_groups(): input_groups = dict((g["Name"], g) for g in input_instance_groups) client = boto3.client("emr", region_name="us-east-1") args = deepcopy(run_job_flow_args) args["Instances"] = {"InstanceGroups": input_instance_groups} cluster_id = client.run_job_flow(**args)["JobFlowId"] groups = client.list_instance_groups(ClusterId=cluster_id)["InstanceGroups"] for x in groups: y = input_groups[x["Name"]] x.should.have.key("Id") x["RequestedInstanceCount"].should.equal(y["InstanceCount"]) x["InstanceGroupType"].should.equal(y["InstanceRole"]) x["InstanceType"].should.equal(y["InstanceType"]) x["Market"].should.equal(y["Market"]) if "BidPrice" in y: x["BidPrice"].should.equal(y["BidPrice"]) if "EbsConfiguration" in y: _do_assertion_ebs_configuration(x, y) auto_scaling_policy = { "Constraints": {"MinCapacity": 2, "MaxCapacity": 10}, "Rules": [ { "Name": "Default-scale-out", "Description": "Replicates the default scale-out rule in the console for YARN memory.", "Action": { "SimpleScalingPolicyConfiguration": { "AdjustmentType": "CHANGE_IN_CAPACITY", "ScalingAdjustment": 1, "CoolDown": 300, } }, "Trigger": { "CloudWatchAlarmDefinition": { "ComparisonOperator": "LESS_THAN", "EvaluationPeriods": 1, "MetricName": "YARNMemoryAvailablePercentage", "Namespace": "AWS/ElasticMapReduce", "Period": 300, "Threshold": 15.0, "Statistic": "AVERAGE", "Unit": "PERCENT", "Dimensions": [{"Key": "JobFlowId", "Value": "${emr.clusterId}"}], } }, } ], } @mock_emr def test_run_job_flow_with_instance_groups_with_autoscaling(): input_groups = dict((g["Name"], g) for g in input_instance_groups) input_groups["core"]["AutoScalingPolicy"] = auto_scaling_policy input_groups["task-1"]["AutoScalingPolicy"] = auto_scaling_policy client = boto3.client("emr", region_name="us-east-1") args = deepcopy(run_job_flow_args) args["Instances"] = {"InstanceGroups": input_instance_groups} cluster_id = client.run_job_flow(**args)["JobFlowId"] groups = client.list_instance_groups(ClusterId=cluster_id)["InstanceGroups"] for x in groups: y = deepcopy(input_groups[x["Name"]]) if "AutoScalingPolicy" in y: x["AutoScalingPolicy"]["Status"]["State"].should.equal("ATTACHED") returned_policy = deepcopy(x["AutoScalingPolicy"]) auto_scaling_policy_with_cluster_id = _patch_cluster_id_placeholder_in_autoscaling_policy( y["AutoScalingPolicy"], cluster_id ) del returned_policy["Status"] returned_policy.should.equal(auto_scaling_policy_with_cluster_id) @mock_emr def test_put_remove_auto_scaling_policy(): region_name = "us-east-1" input_groups = dict((g["Name"], g) for g in input_instance_groups) client = boto3.client("emr", region_name=region_name) args = deepcopy(run_job_flow_args) args["Instances"] = {"InstanceGroups": input_instance_groups} cluster_id = client.run_job_flow(**args)["JobFlowId"] core_instance_group = [ ig for ig in client.list_instance_groups(ClusterId=cluster_id)["InstanceGroups"] if ig["InstanceGroupType"] == "CORE" ][0] resp = client.put_auto_scaling_policy( ClusterId=cluster_id, InstanceGroupId=core_instance_group["Id"], AutoScalingPolicy=auto_scaling_policy, ) auto_scaling_policy_with_cluster_id = _patch_cluster_id_placeholder_in_autoscaling_policy( auto_scaling_policy, cluster_id ) del resp["AutoScalingPolicy"]["Status"] resp["AutoScalingPolicy"].should.equal(auto_scaling_policy_with_cluster_id) resp["ClusterArn"].should.equal( "arn:aws:elasticmapreduce:{0}:{1}:cluster/{2}".format( region_name, ACCOUNT_ID, cluster_id ) ) core_instance_group = [ ig for ig in client.list_instance_groups(ClusterId=cluster_id)["InstanceGroups"] if ig["InstanceGroupType"] == "CORE" ][0] ("AutoScalingPolicy" in core_instance_group).should.equal(True) client.remove_auto_scaling_policy( ClusterId=cluster_id, InstanceGroupId=core_instance_group["Id"] ) core_instance_group = [ ig for ig in client.list_instance_groups(ClusterId=cluster_id)["InstanceGroups"] if ig["InstanceGroupType"] == "CORE" ][0] ("AutoScalingPolicy" not in core_instance_group).should.equal(True) def _patch_cluster_id_placeholder_in_autoscaling_policy( auto_scaling_policy, cluster_id ): policy_copy = deepcopy(auto_scaling_policy) for rule in policy_copy["Rules"]: for dimension in rule["Trigger"]["CloudWatchAlarmDefinition"]["Dimensions"]: dimension["Value"] = cluster_id return policy_copy @mock_emr def test_run_job_flow_with_custom_ami(): client = boto3.client("emr", region_name="us-east-1") with pytest.raises(ClientError) as ex: # CustomAmiId available in Amazon EMR 5.7.0 and later args = deepcopy(run_job_flow_args) args["CustomAmiId"] = "MyEmrCustomId" args["ReleaseLabel"] = "emr-5.6.0" client.run_job_flow(**args) ex.value.response["Error"]["Code"].should.equal("ValidationException") ex.value.response["Error"]["Message"].should.equal("Custom AMI is not allowed") with pytest.raises(ClientError) as ex: args = deepcopy(run_job_flow_args) args["CustomAmiId"] = "MyEmrCustomId" args["AmiVersion"] = "3.8.1" client.run_job_flow(**args) ex.value.response["Error"]["Code"].should.equal("ValidationException") ex.value.response["Error"]["Message"].should.equal( "Custom AMI is not supported in this version of EMR" ) with pytest.raises(ClientError) as ex: # AMI version and release label exception raises before CustomAmi exception args = deepcopy(run_job_flow_args) args["CustomAmiId"] = "MyEmrCustomId" args["ReleaseLabel"] = "emr-5.6.0" args["AmiVersion"] = "3.8.1" client.run_job_flow(**args) ex.value.response["Error"]["Code"].should.equal("ValidationException") ex.value.response["Error"]["Message"].should.contain( "Only one AMI version and release label may be specified." ) args = deepcopy(run_job_flow_args) args["CustomAmiId"] = "MyEmrCustomAmi" args["ReleaseLabel"] = "emr-5.31.0" cluster_id = client.run_job_flow(**args)["JobFlowId"] resp = client.describe_cluster(ClusterId=cluster_id) resp["Cluster"]["CustomAmiId"].should.equal("MyEmrCustomAmi") @mock_emr def test_run_job_flow_with_step_concurrency(): client = boto3.client("emr", region_name="us-east-1") args = deepcopy(run_job_flow_args) args["StepConcurrencyLevel"] = 2 cluster_id = client.run_job_flow(**args)["JobFlowId"] resp = client.describe_cluster(ClusterId=cluster_id)["Cluster"] resp["Name"].should.equal(args["Name"]) resp["Status"]["State"].should.equal("WAITING") resp["StepConcurrencyLevel"].should.equal(2) @mock_emr def test_modify_cluster(): client = boto3.client("emr", region_name="us-east-1") args = deepcopy(run_job_flow_args) args["StepConcurrencyLevel"] = 2 cluster_id = client.run_job_flow(**args)["JobFlowId"] resp = client.describe_cluster(ClusterId=cluster_id)["Cluster"] resp["Name"].should.equal(args["Name"]) resp["Status"]["State"].should.equal("WAITING") resp["StepConcurrencyLevel"].should.equal(2) resp = client.modify_cluster(ClusterId=cluster_id, StepConcurrencyLevel=4) resp["StepConcurrencyLevel"].should.equal(4) resp = client.describe_cluster(ClusterId=cluster_id)["Cluster"] resp["StepConcurrencyLevel"].should.equal(4) @mock_emr def test_set_termination_protection(): client = boto3.client("emr", region_name="us-east-1") args = deepcopy(run_job_flow_args) args["Instances"]["TerminationProtected"] = False resp = client.run_job_flow(**args) cluster_id = resp["JobFlowId"] resp = client.describe_cluster(ClusterId=cluster_id) resp["Cluster"]["TerminationProtected"].should.equal(False) for expected in (True, False): resp = client.set_termination_protection( JobFlowIds=[cluster_id], TerminationProtected=expected ) resp = client.describe_cluster(ClusterId=cluster_id) resp["Cluster"]["TerminationProtected"].should.equal(expected) @mock_emr def test_terminate_protected_job_flow_raises_error(): client = boto3.client("emr", region_name="us-east-1") resp = client.run_job_flow(**run_job_flow_args) cluster_id = resp["JobFlowId"] client.set_termination_protection( JobFlowIds=[cluster_id], TerminationProtected=True ) with pytest.raises(ClientError) as ex: client.terminate_job_flows( JobFlowIds=[cluster_id,] ) error = ex.value.response["Error"] error["Code"].should.equal("ValidationException") error["Message"].should.equal( "Could not shut down one or more job flows since they are termination protected." ) @mock_emr def test_set_visible_to_all_users(): client = boto3.client("emr", region_name="us-east-1") args = deepcopy(run_job_flow_args) args["VisibleToAllUsers"] = False resp = client.run_job_flow(**args) cluster_id = resp["JobFlowId"] resp = client.describe_cluster(ClusterId=cluster_id) resp["Cluster"]["VisibleToAllUsers"].should.equal(False) for expected in (True, False): resp = client.set_visible_to_all_users( JobFlowIds=[cluster_id], VisibleToAllUsers=expected ) resp = client.describe_cluster(ClusterId=cluster_id) resp["Cluster"]["VisibleToAllUsers"].should.equal(expected) @mock_emr def test_terminate_job_flows(): client = boto3.client("emr", region_name="us-east-1") resp = client.run_job_flow(**run_job_flow_args) cluster_id = resp["JobFlowId"] resp = client.describe_cluster(ClusterId=cluster_id) resp["Cluster"]["Status"]["State"].should.equal("WAITING") resp = client.terminate_job_flows(JobFlowIds=[cluster_id]) resp = client.describe_cluster(ClusterId=cluster_id) resp["Cluster"]["Status"]["State"].should.equal("TERMINATED") # testing multiple end points for each feature @mock_emr def test_bootstrap_actions(): bootstrap_actions = [ { "Name": "bs1", "ScriptBootstrapAction": { "Args": ["arg1", "arg2"], "Path": "s3://path/to/script", }, }, { "Name": "bs2", "ScriptBootstrapAction": {"Args": [], "Path": "s3://path/to/anotherscript"}, }, ] client = boto3.client("emr", region_name="us-east-1") args = deepcopy(run_job_flow_args) args["BootstrapActions"] = bootstrap_actions cluster_id = client.run_job_flow(**args)["JobFlowId"] cl = client.describe_job_flows(JobFlowIds=[cluster_id])["JobFlows"][0] for x, y in zip(cl["BootstrapActions"], bootstrap_actions): x["BootstrapActionConfig"].should.equal(y) resp = client.list_bootstrap_actions(ClusterId=cluster_id) for x, y in zip(resp["BootstrapActions"], bootstrap_actions): x["Name"].should.equal(y["Name"]) if "Args" in y["ScriptBootstrapAction"]: x["Args"].should.equal(y["ScriptBootstrapAction"]["Args"]) x["ScriptPath"].should.equal(y["ScriptBootstrapAction"]["Path"]) @mock_emr def test_instances(): input_groups = dict((g["Name"], g) for g in input_instance_groups) client = boto3.client("emr", region_name="us-east-1") args = deepcopy(run_job_flow_args) args["Instances"] = {"InstanceGroups": input_instance_groups} cluster_id = client.run_job_flow(**args)["JobFlowId"] jf = client.describe_job_flows(JobFlowIds=[cluster_id])["JobFlows"][0] instances = client.list_instances(ClusterId=cluster_id)["Instances"] len(instances).should.equal(sum(g["InstanceCount"] for g in input_instance_groups)) for x in instances: x.should.have.key("InstanceGroupId") instance_group = [ j for j in jf["Instances"]["InstanceGroups"] if j["InstanceGroupId"] == x["InstanceGroupId"] ] len(instance_group).should.equal(1) y = input_groups[instance_group[0]["Name"]] x.should.have.key("Id") x.should.have.key("Ec2InstanceId") x.should.have.key("PublicDnsName") x.should.have.key("PublicIpAddress") x.should.have.key("PrivateDnsName") x.should.have.key("PrivateIpAddress") x.should.have.key("InstanceFleetId") x["InstanceType"].should.equal(y["InstanceType"]) x["Market"].should.equal(y["Market"]) x["Status"]["Timeline"]["ReadyDateTime"].should.be.a("datetime.datetime") x["Status"]["Timeline"]["CreationDateTime"].should.be.a("datetime.datetime") x["Status"]["State"].should.equal("RUNNING") for x in [["MASTER"], ["CORE"], ["TASK"], ["MASTER", "TASK"]]: instances = client.list_instances(ClusterId=cluster_id, InstanceGroupTypes=x)[ "Instances" ] len(instances).should.equal( sum( g["InstanceCount"] for g in input_instance_groups if g["InstanceRole"] in x ) ) @mock_emr def test_instance_groups(): input_groups = dict((g["Name"], g) for g in input_instance_groups) client = boto3.client("emr", region_name="us-east-1") args = deepcopy(run_job_flow_args) for key in ["MasterInstanceType", "SlaveInstanceType", "InstanceCount"]: del args["Instances"][key] args["Instances"]["InstanceGroups"] = input_instance_groups[:2] cluster_id = client.run_job_flow(**args)["JobFlowId"] jf = client.describe_job_flows(JobFlowIds=[cluster_id])["JobFlows"][0] base_instance_count = jf["Instances"]["InstanceCount"] instance_groups_to_add = deepcopy(input_instance_groups[2:]) instance_groups_to_add[0]["AutoScalingPolicy"] = auto_scaling_policy instance_groups_to_add[1]["AutoScalingPolicy"] = auto_scaling_policy client.add_instance_groups( JobFlowId=cluster_id, InstanceGroups=instance_groups_to_add ) jf = client.describe_job_flows(JobFlowIds=[cluster_id])["JobFlows"][0] jf["Instances"]["InstanceCount"].should.equal( sum(g["InstanceCount"] for g in input_instance_groups) ) for x in jf["Instances"]["InstanceGroups"]: y = input_groups[x["Name"]] if "BidPrice" in y: x["BidPrice"].should.equal(y["BidPrice"]) x["CreationDateTime"].should.be.a("datetime.datetime") # x['EndDateTime'].should.be.a('datetime.datetime') x.should.have.key("InstanceGroupId") x["InstanceRequestCount"].should.equal(y["InstanceCount"]) x["InstanceRole"].should.equal(y["InstanceRole"]) x["InstanceRunningCount"].should.equal(y["InstanceCount"]) x["InstanceType"].should.equal(y["InstanceType"]) # x['LastStateChangeReason'].should.equal(y['LastStateChangeReason']) x["Market"].should.equal(y["Market"]) x["Name"].should.equal(y["Name"]) x["ReadyDateTime"].should.be.a("datetime.datetime") x["StartDateTime"].should.be.a("datetime.datetime") x["State"].should.equal("RUNNING") groups = client.list_instance_groups(ClusterId=cluster_id)["InstanceGroups"] for x in groups: y = deepcopy(input_groups[x["Name"]]) if "BidPrice" in y: x["BidPrice"].should.equal(y["BidPrice"]) if "AutoScalingPolicy" in y: x["AutoScalingPolicy"]["Status"]["State"].should.equal("ATTACHED") returned_policy = dict(x["AutoScalingPolicy"]) del returned_policy["Status"] policy = json.loads( json.dumps(y["AutoScalingPolicy"]).replace( "${emr.clusterId}", cluster_id ) ) returned_policy.should.equal(policy) if "EbsConfiguration" in y: _do_assertion_ebs_configuration(x, y) # Configurations # EbsBlockDevices # EbsOptimized x.should.have.key("Id") x["InstanceGroupType"].should.equal(y["InstanceRole"]) x["InstanceType"].should.equal(y["InstanceType"]) x["Market"].should.equal(y["Market"]) x["Name"].should.equal(y["Name"]) x["RequestedInstanceCount"].should.equal(y["InstanceCount"]) x["RunningInstanceCount"].should.equal(y["InstanceCount"]) # ShrinkPolicy x["Status"]["State"].should.equal("RUNNING") x["Status"]["StateChangeReason"]["Code"].should.be.a(str) # x['Status']['StateChangeReason']['Message'].should.be.a(str) x["Status"]["Timeline"]["CreationDateTime"].should.be.a("datetime.datetime") # x['Status']['Timeline']['EndDateTime'].should.be.a('datetime.datetime') x["Status"]["Timeline"]["ReadyDateTime"].should.be.a("datetime.datetime") igs = dict((g["Name"], g) for g in groups) client.modify_instance_groups( InstanceGroups=[ {"InstanceGroupId": igs["task-1"]["Id"], "InstanceCount": 2}, {"InstanceGroupId": igs["task-2"]["Id"], "InstanceCount": 3}, ] ) jf = client.describe_job_flows(JobFlowIds=[cluster_id])["JobFlows"][0] jf["Instances"]["InstanceCount"].should.equal(base_instance_count + 5) igs = dict((g["Name"], g) for g in jf["Instances"]["InstanceGroups"]) igs["task-1"]["InstanceRunningCount"].should.equal(2) igs["task-2"]["InstanceRunningCount"].should.equal(3) @mock_emr def test_steps(): input_steps = [ { "HadoopJarStep": { "Args": [ "hadoop-streaming", "-files", "s3://elasticmapreduce/samples/wordcount/wordSplitter.py#wordSplitter.py", "-mapper", "python wordSplitter.py", "-input", "s3://elasticmapreduce/samples/wordcount/input", "-output", "s3://output_bucket/output/wordcount_output", "-reducer", "aggregate", ], "Jar": "command-runner.jar", }, "Name": "My wordcount example", }, { "HadoopJarStep": { "Args": [ "hadoop-streaming", "-files", "s3://elasticmapreduce/samples/wordcount/wordSplitter2.py#wordSplitter2.py", "-mapper", "python wordSplitter2.py", "-input", "s3://elasticmapreduce/samples/wordcount/input2", "-output", "s3://output_bucket/output/wordcount_output2", "-reducer", "aggregate", ], "Jar": "command-runner.jar", }, "Name": "My wordcount example2", }, ] # TODO: implementation and test for cancel_steps client = boto3.client("emr", region_name="us-east-1") args = deepcopy(run_job_flow_args) args["Steps"] = [input_steps[0]] cluster_id = client.run_job_flow(**args)["JobFlowId"] jf = client.describe_job_flows(JobFlowIds=[cluster_id])["JobFlows"][0] jf["Steps"].should.have.length_of(1) client.add_job_flow_steps(JobFlowId=cluster_id, Steps=[input_steps[1]]) jf = client.describe_job_flows(JobFlowIds=[cluster_id])["JobFlows"][0] jf["Steps"].should.have.length_of(2) for idx, (x, y) in enumerate(zip(jf["Steps"], input_steps)): x["ExecutionStatusDetail"].should.have.key("CreationDateTime") # x['ExecutionStatusDetail'].should.have.key('EndDateTime') # x['ExecutionStatusDetail'].should.have.key('LastStateChangeReason') # x['ExecutionStatusDetail'].should.have.key('StartDateTime') x["ExecutionStatusDetail"]["State"].should.equal( "STARTING" if idx == 0 else "PENDING" ) x["StepConfig"]["ActionOnFailure"].should.equal("TERMINATE_CLUSTER") x["StepConfig"]["HadoopJarStep"]["Args"].should.equal( y["HadoopJarStep"]["Args"] ) x["StepConfig"]["HadoopJarStep"]["Jar"].should.equal(y["HadoopJarStep"]["Jar"]) if "MainClass" in y["HadoopJarStep"]: x["StepConfig"]["HadoopJarStep"]["MainClass"].should.equal( y["HadoopJarStep"]["MainClass"] ) if "Properties" in y["HadoopJarStep"]: x["StepConfig"]["HadoopJarStep"]["Properties"].should.equal( y["HadoopJarStep"]["Properties"] ) x["StepConfig"]["Name"].should.equal(y["Name"]) expected = dict((s["Name"], s) for s in input_steps) steps = client.list_steps(ClusterId=cluster_id)["Steps"] steps.should.have.length_of(2) for x in steps: y = expected[x["Name"]] x["ActionOnFailure"].should.equal("TERMINATE_CLUSTER") x["Config"]["Args"].should.equal(y["HadoopJarStep"]["Args"]) x["Config"]["Jar"].should.equal(y["HadoopJarStep"]["Jar"]) # x['Config']['MainClass'].should.equal(y['HadoopJarStep']['MainClass']) # Properties x["Id"].should.be.a(str) x["Name"].should.equal(y["Name"]) x["Status"]["State"].should.be.within(["STARTING", "PENDING"]) # StateChangeReason x["Status"]["Timeline"]["CreationDateTime"].should.be.a("datetime.datetime") # x['Status']['Timeline']['EndDateTime'].should.be.a('datetime.datetime') # Only the first step will have started - we don't know anything about when it finishes, so the second step never starts if x["Name"] == "My wordcount example": x["Status"]["Timeline"]["StartDateTime"].should.be.a("datetime.datetime") x = client.describe_step(ClusterId=cluster_id, StepId=x["Id"])["Step"] x["ActionOnFailure"].should.equal("TERMINATE_CLUSTER") x["Config"]["Args"].should.equal(y["HadoopJarStep"]["Args"]) x["Config"]["Jar"].should.equal(y["HadoopJarStep"]["Jar"]) # x['Config']['MainClass'].should.equal(y['HadoopJarStep']['MainClass']) # Properties x["Id"].should.be.a(str) x["Name"].should.equal(y["Name"]) x["Status"]["State"].should.be.within(["STARTING", "PENDING"]) # StateChangeReason x["Status"]["Timeline"]["CreationDateTime"].should.be.a("datetime.datetime") # x['Status']['Timeline']['EndDateTime'].should.be.a('datetime.datetime') # x['Status']['Timeline']['StartDateTime'].should.be.a('datetime.datetime') step_id = steps[0]["Id"] steps = client.list_steps(ClusterId=cluster_id, StepIds=[step_id])["Steps"] steps.should.have.length_of(1) steps[0]["Id"].should.equal(step_id) steps = client.list_steps(ClusterId=cluster_id, StepStates=["STARTING"])["Steps"] steps.should.have.length_of(1) steps[0]["Id"].should.equal(step_id) @mock_emr def test_tags(): input_tags = [ {"Key": "newkey1", "Value": "newval1"}, {"Key": "newkey2", "Value": "newval2"}, ] client = boto3.client("emr", region_name="us-east-1") cluster_id = client.run_job_flow(**run_job_flow_args)["JobFlowId"] client.add_tags(ResourceId=cluster_id, Tags=input_tags) resp = client.describe_cluster(ClusterId=cluster_id)["Cluster"] resp["Tags"].should.have.length_of(2) dict((t["Key"], t["Value"]) for t in resp["Tags"]).should.equal( dict((t["Key"], t["Value"]) for t in input_tags) ) client.remove_tags(ResourceId=cluster_id, TagKeys=[t["Key"] for t in input_tags]) resp = client.describe_cluster(ClusterId=cluster_id)["Cluster"] resp["Tags"].should.equal([]) @mock_emr def test_security_configurations(): client = boto3.client("emr", region_name="us-east-1") security_configuration_name = "MySecurityConfiguration" security_configuration = """ { "EncryptionConfiguration": { "AtRestEncryptionConfiguration": { "S3EncryptionConfiguration": { "EncryptionMode": "SSE-S3" } }, "EnableInTransitEncryption": false, "EnableAtRestEncryption": true } } """.strip() resp = client.create_security_configuration( Name=security_configuration_name, SecurityConfiguration=security_configuration ) resp["Name"].should.equal(security_configuration_name) resp["CreationDateTime"].should.be.a("datetime.datetime") resp = client.describe_security_configuration(Name=security_configuration_name) resp["Name"].should.equal(security_configuration_name) resp["SecurityConfiguration"].should.equal(security_configuration) resp["CreationDateTime"].should.be.a("datetime.datetime") client.delete_security_configuration(Name=security_configuration_name) with pytest.raises(ClientError) as ex: client.describe_security_configuration(Name=security_configuration_name) ex.value.response["Error"]["Code"].should.equal("InvalidRequestException") ex.value.response["Error"]["Message"].should.match( r"Security configuration with name .* does not exist." ) with pytest.raises(ClientError) as ex: client.delete_security_configuration(Name=security_configuration_name) ex.value.response["Error"]["Code"].should.equal("InvalidRequestException") ex.value.response["Error"]["Message"].should.match( r"Security configuration with name .* does not exist." ) @mock_emr def test_run_job_flow_with_invalid_number_of_master_nodes_raises_error(): client = boto3.client("emr", region_name="us-east-1") params = dict( Name="test-cluster", Instances={ "InstanceGroups": [ { "InstanceCount": 2, "InstanceRole": "MASTER", "InstanceType": "c1.medium", "Market": "ON_DEMAND", "Name": "master", } ] }, ) with pytest.raises(ClientError) as ex: client.run_job_flow(**params) error = ex.value.response["Error"] error["Code"].should.equal("ValidationException") error["Message"].should.equal( "Master instance group must have exactly 3 instances for HA clusters." ) @mock_emr def test_run_job_flow_with_multiple_master_nodes(): client = boto3.client("emr", region_name="us-east-1") params = dict( Name="test-cluster", Instances={ "InstanceGroups": [ { "InstanceCount": 3, "InstanceRole": "MASTER", "InstanceType": "c1.medium", "Market": "ON_DEMAND", "Name": "master", } ], "KeepJobFlowAliveWhenNoSteps": False, "TerminationProtected": False, }, ) cluster_id = client.run_job_flow(**params)["JobFlowId"] cluster = client.describe_cluster(ClusterId=cluster_id)["Cluster"] cluster["AutoTerminate"].should.equal(False) cluster["TerminationProtected"].should.equal(True) groups = client.list_instance_groups(ClusterId=cluster_id)["InstanceGroups"] master_instance_group = next( group for group in groups if group["InstanceGroupType"] == "MASTER" ) master_instance_group["RequestedInstanceCount"].should.equal(3) master_instance_group["RunningInstanceCount"].should.equal(3)