Techdebt: Replace sure with regular assertions in EMR (#6561)

This commit is contained in:
Bert Blommers 2023-07-26 09:42:43 +00:00 committed by GitHub
parent fe6df96e34
commit 11753adbf0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 310 additions and 301 deletions

View File

@ -1,11 +1,9 @@
# -*- coding: utf-8 -*-
import time import time
from copy import deepcopy from copy import deepcopy
from datetime import datetime, timezone from datetime import datetime, timezone
import boto3 import boto3
import json import json
import sure # noqa # pylint: disable=unused-import
from botocore.exceptions import ClientError from botocore.exceptions import ClientError
import pytest import pytest
@ -122,67 +120,73 @@ def test_describe_cluster():
cluster_id = client.run_job_flow(**args)["JobFlowId"] cluster_id = client.run_job_flow(**args)["JobFlowId"]
cl = client.describe_cluster(ClusterId=cluster_id)["Cluster"] cl = client.describe_cluster(ClusterId=cluster_id)["Cluster"]
cl["Applications"][0]["Name"].should.equal("Spark") assert cl["Applications"][0]["Name"] == "Spark"
cl["Applications"][0]["Version"].should.equal("2.4.2") assert cl["Applications"][0]["Version"] == "2.4.2"
cl["AutoTerminate"].should.equal(True) assert cl["AutoTerminate"] is True
config = cl["Configurations"][0] config = cl["Configurations"][0]
config["Classification"].should.equal("yarn-site") assert config["Classification"] == "yarn-site"
config["Properties"].should.equal(args["Configurations"][0]["Properties"]) assert config["Properties"] == args["Configurations"][0]["Properties"]
nested_config = cl["Configurations"][1] nested_config = cl["Configurations"][1]
nested_config["Classification"].should.equal("nested-configs") assert nested_config["Classification"] == "nested-configs"
nested_config["Properties"].should.equal(args["Configurations"][1]["Properties"]) assert nested_config["Properties"] == args["Configurations"][1]["Properties"]
attrs = cl["Ec2InstanceAttributes"] attrs = cl["Ec2InstanceAttributes"]
attrs["AdditionalMasterSecurityGroups"].should.equal( assert (
args["Instances"]["AdditionalMasterSecurityGroups"] attrs["AdditionalMasterSecurityGroups"]
== args["Instances"]["AdditionalMasterSecurityGroups"]
) )
attrs["AdditionalSlaveSecurityGroups"].should.equal( assert (
args["Instances"]["AdditionalSlaveSecurityGroups"] attrs["AdditionalSlaveSecurityGroups"]
== args["Instances"]["AdditionalSlaveSecurityGroups"]
) )
attrs["Ec2AvailabilityZone"].should.equal("us-east-1a") assert attrs["Ec2AvailabilityZone"] == "us-east-1a"
attrs["Ec2KeyName"].should.equal(args["Instances"]["Ec2KeyName"]) assert attrs["Ec2KeyName"] == args["Instances"]["Ec2KeyName"]
attrs["Ec2SubnetId"].should.equal(args["Instances"]["Ec2SubnetId"]) assert attrs["Ec2SubnetId"] == args["Instances"]["Ec2SubnetId"]
attrs["EmrManagedMasterSecurityGroup"].should.equal( assert (
args["Instances"]["EmrManagedMasterSecurityGroup"] attrs["EmrManagedMasterSecurityGroup"]
== args["Instances"]["EmrManagedMasterSecurityGroup"]
) )
attrs["EmrManagedSlaveSecurityGroup"].should.equal( assert (
args["Instances"]["EmrManagedSlaveSecurityGroup"] attrs["EmrManagedSlaveSecurityGroup"]
== args["Instances"]["EmrManagedSlaveSecurityGroup"]
) )
attrs["IamInstanceProfile"].should.equal(args["JobFlowRole"]) assert attrs["IamInstanceProfile"] == args["JobFlowRole"]
attrs["ServiceAccessSecurityGroup"].should.equal( assert (
args["Instances"]["ServiceAccessSecurityGroup"] attrs["ServiceAccessSecurityGroup"]
== args["Instances"]["ServiceAccessSecurityGroup"]
) )
cl["Id"].should.equal(cluster_id) assert cl["Id"] == cluster_id
cl["KerberosAttributes"].should.equal(args["KerberosAttributes"]) assert cl["KerberosAttributes"] == args["KerberosAttributes"]
cl["LogUri"].should.equal(args["LogUri"]) assert cl["LogUri"] == args["LogUri"]
cl["MasterPublicDnsName"].should.be.a(str) assert isinstance(cl["MasterPublicDnsName"], str)
cl["Name"].should.equal(args["Name"]) assert cl["Name"] == args["Name"]
cl["NormalizedInstanceHours"].should.equal(0) assert cl["NormalizedInstanceHours"] == 0
# cl['ReleaseLabel'].should.equal('emr-5.0.0') # assert cl['ReleaseLabel'] == 'emr-5.0.0'
cl.shouldnt.have.key("RequestedAmiVersion") assert "RequestedAmiVersion" not in cl
cl["RunningAmiVersion"].should.equal("1.0.0") assert cl["RunningAmiVersion"] == "1.0.0"
cl["SecurityConfiguration"].should.be.a(str) assert isinstance(cl["SecurityConfiguration"], str)
cl["SecurityConfiguration"].should.equal(args["SecurityConfiguration"]) assert cl["SecurityConfiguration"] == args["SecurityConfiguration"]
cl["ServiceRole"].should.equal(args["ServiceRole"]) assert cl["ServiceRole"] == args["ServiceRole"]
cl["AutoScalingRole"].should.equal(args["AutoScalingRole"]) assert cl["AutoScalingRole"] == args["AutoScalingRole"]
status = cl["Status"] status = cl["Status"]
status["State"].should.equal("TERMINATED") assert status["State"] == "TERMINATED"
# cluster['Status']['StateChangeReason'] # cluster['Status']['StateChangeReason']
status["Timeline"]["CreationDateTime"].should.be.a("datetime.datetime") assert isinstance(status["Timeline"]["CreationDateTime"], datetime)
# status['Timeline']['EndDateTime'].should.equal(datetime(2014, 1, 24, 2, 19, 46, tzinfo=timezone.utc)) # assert status['Timeline']['EndDateTime'] == datetime(2014, 1, 24, 2, 19, 46, tzinfo=timezone.utc)
status["Timeline"]["ReadyDateTime"].should.be.a("datetime.datetime") assert isinstance(status["Timeline"]["ReadyDateTime"], datetime)
dict((t["Key"], t["Value"]) for t in cl["Tags"]).should.equal( assert {t["Key"]: t["Value"] for t in cl["Tags"]} == {
dict((t["Key"], t["Value"]) for t in args["Tags"]) t["Key"]: t["Value"] for t in args["Tags"]
) }
cl["TerminationProtected"].should.equal(False) assert cl["TerminationProtected"] is False
cl["VisibleToAllUsers"].should.equal(True) assert cl["VisibleToAllUsers"] is True
cl["ClusterArn"].should.equal( assert (
f"arn:aws:elasticmapreduce:{region_name}:{ACCOUNT_ID}:cluster/{cluster_id}" cl["ClusterArn"]
== f"arn:aws:elasticmapreduce:{region_name}:{ACCOUNT_ID}:cluster/{cluster_id}"
) )
@ -229,23 +233,23 @@ def test_describe_job_flows():
} }
resp = client.describe_job_flows() resp = client.describe_job_flows()
resp["JobFlows"].should.have.length_of(6) assert len(resp["JobFlows"]) == 6
for cluster_id in expected: for cluster_id in expected:
resp = client.describe_job_flows(JobFlowIds=[cluster_id]) resp = client.describe_job_flows(JobFlowIds=[cluster_id])
resp["JobFlows"].should.have.length_of(1) assert len(resp["JobFlows"]) == 1
resp["JobFlows"][0]["JobFlowId"].should.equal(cluster_id) assert resp["JobFlows"][0]["JobFlowId"] == cluster_id
resp = client.describe_job_flows(JobFlowStates=["WAITING"]) resp = client.describe_job_flows(JobFlowStates=["WAITING"])
resp["JobFlows"].should.have.length_of(4) assert len(resp["JobFlows"]) == 4
for x in resp["JobFlows"]: for x in resp["JobFlows"]:
x["ExecutionStatusDetail"]["State"].should.equal("WAITING") assert x["ExecutionStatusDetail"]["State"] == "WAITING"
resp = client.describe_job_flows(CreatedBefore=timestamp) resp = client.describe_job_flows(CreatedBefore=timestamp)
resp["JobFlows"].should.have.length_of(4) assert len(resp["JobFlows"]) == 4
resp = client.describe_job_flows(CreatedAfter=timestamp) resp = client.describe_job_flows(CreatedAfter=timestamp)
resp["JobFlows"].should.have.length_of(2) assert len(resp["JobFlows"]) == 2
@mock_emr @mock_emr
@ -268,53 +272,54 @@ def test_describe_job_flow():
jf = client.describe_job_flows(JobFlowIds=[cluster_id])["JobFlows"][0] jf = client.describe_job_flows(JobFlowIds=[cluster_id])["JobFlows"][0]
jf["AmiVersion"].should.equal(args["AmiVersion"]) assert jf["AmiVersion"] == args["AmiVersion"]
jf.shouldnt.have.key("BootstrapActions") assert "BootstrapActions" not in jf
esd = jf["ExecutionStatusDetail"] esd = jf["ExecutionStatusDetail"]
esd["CreationDateTime"].should.be.a("datetime.datetime") assert isinstance(esd["CreationDateTime"], datetime)
# esd['EndDateTime'].should.be.a('datetime.datetime') # assert isinstance(esd['EndDateTime'], 'datetime.datetime')
# esd['LastStateChangeReason'].should.be.a(str) # assert isinstance(esd['LastStateChangeReason'], str)
esd["ReadyDateTime"].should.be.a("datetime.datetime") assert isinstance(esd["ReadyDateTime"], datetime)
esd["StartDateTime"].should.be.a("datetime.datetime") assert isinstance(esd["StartDateTime"], datetime)
esd["State"].should.equal("WAITING") assert esd["State"] == "WAITING"
attrs = jf["Instances"] attrs = jf["Instances"]
attrs["Ec2KeyName"].should.equal(args["Instances"]["Ec2KeyName"]) assert attrs["Ec2KeyName"] == args["Instances"]["Ec2KeyName"]
attrs["Ec2SubnetId"].should.equal(args["Instances"]["Ec2SubnetId"]) assert attrs["Ec2SubnetId"] == args["Instances"]["Ec2SubnetId"]
attrs["HadoopVersion"].should.equal(args["Instances"]["HadoopVersion"]) assert attrs["HadoopVersion"] == args["Instances"]["HadoopVersion"]
attrs["InstanceCount"].should.equal(args["Instances"]["InstanceCount"]) assert attrs["InstanceCount"] == args["Instances"]["InstanceCount"]
for ig in attrs["InstanceGroups"]: for ig in attrs["InstanceGroups"]:
# ig['BidPrice'] # ig['BidPrice']
ig["CreationDateTime"].should.be.a("datetime.datetime") assert isinstance(ig["CreationDateTime"], datetime)
# ig['EndDateTime'].should.be.a('datetime.datetime') # assert isinstance(ig['EndDateTime'], 'datetime.datetime')
ig["InstanceGroupId"].should.be.a(str) assert isinstance(ig["InstanceGroupId"], str)
ig["InstanceRequestCount"].should.be.a(int) assert isinstance(ig["InstanceRequestCount"], int)
ig["InstanceRole"].should.be.within(["MASTER", "CORE"]) assert ig["InstanceRole"] in ["MASTER", "CORE"]
ig["InstanceRunningCount"].should.be.a(int) assert isinstance(ig["InstanceRunningCount"], int)
ig["InstanceType"].should.be.within(["c3.medium", "c3.xlarge"]) assert ig["InstanceType"] in ["c3.medium", "c3.xlarge"]
# ig['LastStateChangeReason'].should.be.a(str) # assert isinstance(ig['LastStateChangeReason'], str)
ig["Market"].should.equal("ON_DEMAND") assert ig["Market"] == "ON_DEMAND"
ig["Name"].should.be.a(str) assert isinstance(ig["Name"], str)
ig["ReadyDateTime"].should.be.a("datetime.datetime") assert isinstance(ig["ReadyDateTime"], datetime)
ig["StartDateTime"].should.be.a("datetime.datetime") assert isinstance(ig["StartDateTime"], datetime)
ig["State"].should.equal("RUNNING") assert ig["State"] == "RUNNING"
attrs["KeepJobFlowAliveWhenNoSteps"].should.equal(True) assert attrs["KeepJobFlowAliveWhenNoSteps"] is True
# attrs['MasterInstanceId'].should.be.a(str) # assert isinstance(attrs['MasterInstanceId'], str)
attrs["MasterInstanceType"].should.equal(args["Instances"]["MasterInstanceType"]) assert attrs["MasterInstanceType"] == args["Instances"]["MasterInstanceType"]
attrs["MasterPublicDnsName"].should.be.a(str) assert isinstance(attrs["MasterPublicDnsName"], str)
attrs["NormalizedInstanceHours"].should.equal(0) assert attrs["NormalizedInstanceHours"] == 0
attrs["Placement"]["AvailabilityZone"].should.equal( assert (
args["Instances"]["Placement"]["AvailabilityZone"] attrs["Placement"]["AvailabilityZone"]
== args["Instances"]["Placement"]["AvailabilityZone"]
) )
attrs["SlaveInstanceType"].should.equal(args["Instances"]["SlaveInstanceType"]) assert attrs["SlaveInstanceType"] == args["Instances"]["SlaveInstanceType"]
attrs["TerminationProtected"].should.equal(False) assert attrs["TerminationProtected"] is False
jf["JobFlowId"].should.equal(cluster_id) assert jf["JobFlowId"] == cluster_id
jf["JobFlowRole"].should.equal(args["JobFlowRole"]) assert jf["JobFlowRole"] == args["JobFlowRole"]
jf["LogUri"].should.equal(args["LogUri"]) assert jf["LogUri"] == args["LogUri"]
jf["Name"].should.equal(args["Name"]) assert jf["Name"] == args["Name"]
jf["ServiceRole"].should.equal(args["ServiceRole"]) assert jf["ServiceRole"] == args["ServiceRole"]
jf["Steps"].should.equal([]) assert jf["Steps"] == []
jf["SupportedProducts"].should.equal([]) assert jf["SupportedProducts"] == []
jf["VisibleToAllUsers"].should.equal(True) assert jf["VisibleToAllUsers"] is True
@mock_emr @mock_emr
@ -356,34 +361,34 @@ def test_list_clusters():
while 1: while 1:
resp = client.list_clusters(**args) resp = client.list_clusters(**args)
clusters = resp["Clusters"] clusters = resp["Clusters"]
len(clusters).should.be.lower_than_or_equal_to(50) assert len(clusters) <= 50
for x in clusters: for x in clusters:
y = expected[x["Id"]] y = expected[x["Id"]]
x["Id"].should.equal(y["Id"]) assert x["Id"] == y["Id"]
x["Name"].should.equal(y["Name"]) assert x["Name"] == y["Name"]
x["NormalizedInstanceHours"].should.equal(y["NormalizedInstanceHours"]) assert x["NormalizedInstanceHours"] == y["NormalizedInstanceHours"]
x["Status"]["State"].should.equal(y["State"]) assert x["Status"]["State"] == y["State"]
x["Status"]["Timeline"]["CreationDateTime"].should.be.a("datetime.datetime") assert isinstance(x["Status"]["Timeline"]["CreationDateTime"], datetime)
if y["State"] == "TERMINATED": if y["State"] == "TERMINATED":
x["Status"]["Timeline"]["EndDateTime"].should.be.a("datetime.datetime") assert isinstance(x["Status"]["Timeline"]["EndDateTime"], datetime)
else: else:
x["Status"]["Timeline"].shouldnt.have.key("EndDateTime") assert "EndDateTime" not in x["Status"]["Timeline"]
x["Status"]["Timeline"]["ReadyDateTime"].should.be.a("datetime.datetime") assert isinstance(x["Status"]["Timeline"]["ReadyDateTime"], datetime)
marker = resp.get("Marker") marker = resp.get("Marker")
if marker is None: if marker is None:
break break
args = {"Marker": marker} args = {"Marker": marker}
resp = client.list_clusters(ClusterStates=["TERMINATED"]) resp = client.list_clusters(ClusterStates=["TERMINATED"])
resp["Clusters"].should.have.length_of(30) assert len(resp["Clusters"]) == 30
for x in resp["Clusters"]: for x in resp["Clusters"]:
x["Status"]["State"].should.equal("TERMINATED") assert x["Status"]["State"] == "TERMINATED"
resp = client.list_clusters(CreatedBefore=timestamp) resp = client.list_clusters(CreatedBefore=timestamp)
resp["Clusters"].should.have.length_of(40) assert len(resp["Clusters"]) == 40
resp = client.list_clusters(CreatedAfter=timestamp) resp = client.list_clusters(CreatedAfter=timestamp)
resp["Clusters"].should.have.length_of(30) assert len(resp["Clusters"]) == 30
@mock_emr @mock_emr
@ -397,19 +402,20 @@ def test_run_job_flow():
) )
job_flow_id = resp["JobFlowId"] job_flow_id = resp["JobFlowId"]
resp = client.describe_job_flows(JobFlowIds=[job_flow_id])["JobFlows"][0] resp = client.describe_job_flows(JobFlowIds=[job_flow_id])["JobFlows"][0]
resp["ExecutionStatusDetail"]["State"].should.equal("WAITING") assert resp["ExecutionStatusDetail"]["State"] == "WAITING"
resp["JobFlowId"].should.equal(job_flow_id) assert resp["JobFlowId"] == job_flow_id
resp["Name"].should.equal(args["Name"]) assert resp["Name"] == args["Name"]
resp["Instances"]["MasterInstanceType"].should.equal( assert (
args["Instances"]["MasterInstanceType"] resp["Instances"]["MasterInstanceType"]
== args["Instances"]["MasterInstanceType"]
) )
resp["Instances"]["SlaveInstanceType"].should.equal( assert (
args["Instances"]["SlaveInstanceType"] resp["Instances"]["SlaveInstanceType"] == args["Instances"]["SlaveInstanceType"]
) )
resp["LogUri"].should.equal(args["LogUri"]) assert resp["LogUri"] == args["LogUri"]
resp["VisibleToAllUsers"].should.equal(args["VisibleToAllUsers"]) assert resp["VisibleToAllUsers"] == args["VisibleToAllUsers"]
resp["Instances"]["NormalizedInstanceHours"].should.equal(0) assert resp["Instances"]["NormalizedInstanceHours"] == 0
resp["Steps"].should.equal([]) assert resp["Steps"] == []
@mock_emr @mock_emr
@ -421,7 +427,7 @@ def test_run_job_flow_with_invalid_params():
args["AmiVersion"] = "2.4" args["AmiVersion"] = "2.4"
args["ReleaseLabel"] = "emr-5.0.0" args["ReleaseLabel"] = "emr-5.0.0"
client.run_job_flow(**args) client.run_job_flow(**args)
ex.value.response["Error"]["Code"].should.equal("ValidationException") assert ex.value.response["Error"]["Code"] == "ValidationException"
@mock_emr @mock_emr
@ -437,14 +443,14 @@ def test_run_job_flow_in_multiple_regions():
for region in regions.keys(): for region in regions.keys():
client = regions[region]["client"] client = regions[region]["client"]
resp = client.describe_cluster(ClusterId=regions[region]["cluster_id"]) resp = client.describe_cluster(ClusterId=regions[region]["cluster_id"])
resp["Cluster"]["Name"].should.equal(region) assert resp["Cluster"]["Name"] == region
@mock_emr @mock_emr
def test_run_job_flow_with_new_params(): def test_run_job_flow_with_new_params():
client = boto3.client("emr", region_name="us-east-1") client = boto3.client("emr", region_name="us-east-1")
resp = client.run_job_flow(**run_job_flow_args) resp = client.run_job_flow(**run_job_flow_args)
resp.should.have.key("JobFlowId") assert "JobFlowId" in resp
@mock_emr @mock_emr
@ -456,7 +462,7 @@ def test_run_job_flow_with_visible_to_all_users():
resp = client.run_job_flow(**args) resp = client.run_job_flow(**args)
cluster_id = resp["JobFlowId"] cluster_id = resp["JobFlowId"]
resp = client.describe_cluster(ClusterId=cluster_id) resp = client.describe_cluster(ClusterId=cluster_id)
resp["Cluster"]["VisibleToAllUsers"].should.equal(expected) assert resp["Cluster"]["VisibleToAllUsers"] == expected
def _do_assertion_ebs_configuration(x, y): def _do_assertion_ebs_configuration(x, y):
@ -470,8 +476,8 @@ def _do_assertion_ebs_configuration(x, y):
comp_total_size = 0 comp_total_size = 0
for ebs_block in x["EbsBlockDevices"]: for ebs_block in x["EbsBlockDevices"]:
comp_total_size += ebs_block["VolumeSpecification"]["SizeInGB"] comp_total_size += ebs_block["VolumeSpecification"]["SizeInGB"]
len(x["EbsBlockDevices"]).should.equal(total_volumes) assert len(x["EbsBlockDevices"]) == total_volumes
comp_total_size.should.equal(comp_total_size) assert comp_total_size == comp_total_size
@mock_emr @mock_emr
@ -484,13 +490,13 @@ def test_run_job_flow_with_instance_groups():
groups = client.list_instance_groups(ClusterId=cluster_id)["InstanceGroups"] groups = client.list_instance_groups(ClusterId=cluster_id)["InstanceGroups"]
for x in groups: for x in groups:
y = input_groups[x["Name"]] y = input_groups[x["Name"]]
x.should.have.key("Id") assert "Id" in x
x["RequestedInstanceCount"].should.equal(y["InstanceCount"]) assert x["RequestedInstanceCount"] == y["InstanceCount"]
x["InstanceGroupType"].should.equal(y["InstanceRole"]) assert x["InstanceGroupType"] == y["InstanceRole"]
x["InstanceType"].should.equal(y["InstanceType"]) assert x["InstanceType"] == y["InstanceType"]
x["Market"].should.equal(y["Market"]) assert x["Market"] == y["Market"]
if "BidPrice" in y: if "BidPrice" in y:
x["BidPrice"].should.equal(y["BidPrice"]) assert x["BidPrice"] == y["BidPrice"]
if "EbsConfiguration" in y: if "EbsConfiguration" in y:
_do_assertion_ebs_configuration(x, y) _do_assertion_ebs_configuration(x, y)
@ -542,7 +548,7 @@ def test_run_job_flow_with_instance_groups_with_autoscaling():
for x in groups: for x in groups:
y = deepcopy(input_groups[x["Name"]]) y = deepcopy(input_groups[x["Name"]])
if "AutoScalingPolicy" in y: if "AutoScalingPolicy" in y:
x["AutoScalingPolicy"]["Status"]["State"].should.equal("ATTACHED") assert x["AutoScalingPolicy"]["Status"]["State"] == "ATTACHED"
returned_policy = deepcopy(x["AutoScalingPolicy"]) returned_policy = deepcopy(x["AutoScalingPolicy"])
auto_scaling_policy_with_cluster_id = ( auto_scaling_policy_with_cluster_id = (
_patch_cluster_id_placeholder_in_autoscaling_policy( _patch_cluster_id_placeholder_in_autoscaling_policy(
@ -550,7 +556,7 @@ def test_run_job_flow_with_instance_groups_with_autoscaling():
) )
) )
del returned_policy["Status"] del returned_policy["Status"]
returned_policy.should.equal(auto_scaling_policy_with_cluster_id) assert returned_policy == auto_scaling_policy_with_cluster_id
@mock_emr @mock_emr
@ -579,9 +585,10 @@ def test_put_remove_auto_scaling_policy():
) )
) )
del resp["AutoScalingPolicy"]["Status"] del resp["AutoScalingPolicy"]["Status"]
resp["AutoScalingPolicy"].should.equal(auto_scaling_policy_with_cluster_id) assert resp["AutoScalingPolicy"] == auto_scaling_policy_with_cluster_id
resp["ClusterArn"].should.equal( assert (
f"arn:aws:elasticmapreduce:{region_name}:{ACCOUNT_ID}:cluster/{cluster_id}" resp["ClusterArn"]
== f"arn:aws:elasticmapreduce:{region_name}:{ACCOUNT_ID}:cluster/{cluster_id}"
) )
core_instance_group = [ core_instance_group = [
@ -590,7 +597,7 @@ def test_put_remove_auto_scaling_policy():
if ig["InstanceGroupType"] == "CORE" if ig["InstanceGroupType"] == "CORE"
][0] ][0]
("AutoScalingPolicy" in core_instance_group).should.equal(True) assert "AutoScalingPolicy" in core_instance_group
client.remove_auto_scaling_policy( client.remove_auto_scaling_policy(
ClusterId=cluster_id, InstanceGroupId=core_instance_group["Id"] ClusterId=cluster_id, InstanceGroupId=core_instance_group["Id"]
@ -602,7 +609,7 @@ def test_put_remove_auto_scaling_policy():
if ig["InstanceGroupType"] == "CORE" if ig["InstanceGroupType"] == "CORE"
][0] ][0]
("AutoScalingPolicy" not in core_instance_group).should.equal(True) assert "AutoScalingPolicy" not in core_instance_group
def _patch_cluster_id_placeholder_in_autoscaling_policy(policy, cluster_id): def _patch_cluster_id_placeholder_in_autoscaling_policy(policy, cluster_id):
@ -623,18 +630,17 @@ def test_run_job_flow_with_custom_ami():
args["CustomAmiId"] = "MyEmrCustomId" args["CustomAmiId"] = "MyEmrCustomId"
args["ReleaseLabel"] = "emr-5.6.0" args["ReleaseLabel"] = "emr-5.6.0"
client.run_job_flow(**args) client.run_job_flow(**args)
ex.value.response["Error"]["Code"].should.equal("ValidationException") assert ex.value.response["Error"]["Code"] == "ValidationException"
ex.value.response["Error"]["Message"].should.equal("Custom AMI is not allowed") assert ex.value.response["Error"]["Message"] == "Custom AMI is not allowed"
with pytest.raises(ClientError) as ex: with pytest.raises(ClientError) as ex:
args = deepcopy(run_job_flow_args) args = deepcopy(run_job_flow_args)
args["CustomAmiId"] = "MyEmrCustomId" args["CustomAmiId"] = "MyEmrCustomId"
args["AmiVersion"] = "3.8.1" args["AmiVersion"] = "3.8.1"
client.run_job_flow(**args) client.run_job_flow(**args)
ex.value.response["Error"]["Code"].should.equal("ValidationException") err = ex.value.response["Error"]
ex.value.response["Error"]["Message"].should.equal( assert err["Code"] == "ValidationException"
"Custom AMI is not supported in this version of EMR" assert err["Message"] == "Custom AMI is not supported in this version of EMR"
)
with pytest.raises(ClientError) as ex: with pytest.raises(ClientError) as ex:
# AMI version and release label exception raises before CustomAmi exception # AMI version and release label exception raises before CustomAmi exception
@ -643,17 +649,16 @@ def test_run_job_flow_with_custom_ami():
args["ReleaseLabel"] = "emr-5.6.0" args["ReleaseLabel"] = "emr-5.6.0"
args["AmiVersion"] = "3.8.1" args["AmiVersion"] = "3.8.1"
client.run_job_flow(**args) client.run_job_flow(**args)
ex.value.response["Error"]["Code"].should.equal("ValidationException") err = ex.value.response["Error"]
ex.value.response["Error"]["Message"].should.contain( assert err["Code"] == "ValidationException"
"Only one AMI version and release label may be specified." assert "Only one AMI version and release label may be specified." in err["Message"]
)
args = deepcopy(run_job_flow_args) args = deepcopy(run_job_flow_args)
args["CustomAmiId"] = "MyEmrCustomAmi" args["CustomAmiId"] = "MyEmrCustomAmi"
args["ReleaseLabel"] = "emr-5.31.0" args["ReleaseLabel"] = "emr-5.31.0"
cluster_id = client.run_job_flow(**args)["JobFlowId"] cluster_id = client.run_job_flow(**args)["JobFlowId"]
resp = client.describe_cluster(ClusterId=cluster_id) resp = client.describe_cluster(ClusterId=cluster_id)
resp["Cluster"]["CustomAmiId"].should.equal("MyEmrCustomAmi") assert resp["Cluster"]["CustomAmiId"] == "MyEmrCustomAmi"
@mock_emr @mock_emr
@ -663,9 +668,9 @@ def test_run_job_flow_with_step_concurrency():
args["StepConcurrencyLevel"] = 2 args["StepConcurrencyLevel"] = 2
cluster_id = client.run_job_flow(**args)["JobFlowId"] cluster_id = client.run_job_flow(**args)["JobFlowId"]
resp = client.describe_cluster(ClusterId=cluster_id)["Cluster"] resp = client.describe_cluster(ClusterId=cluster_id)["Cluster"]
resp["Name"].should.equal(args["Name"]) assert resp["Name"] == args["Name"]
resp["Status"]["State"].should.equal("WAITING") assert resp["Status"]["State"] == "WAITING"
resp["StepConcurrencyLevel"].should.equal(2) assert resp["StepConcurrencyLevel"] == 2
@mock_emr @mock_emr
@ -675,15 +680,15 @@ def test_modify_cluster():
args["StepConcurrencyLevel"] = 2 args["StepConcurrencyLevel"] = 2
cluster_id = client.run_job_flow(**args)["JobFlowId"] cluster_id = client.run_job_flow(**args)["JobFlowId"]
resp = client.describe_cluster(ClusterId=cluster_id)["Cluster"] resp = client.describe_cluster(ClusterId=cluster_id)["Cluster"]
resp["Name"].should.equal(args["Name"]) assert resp["Name"] == args["Name"]
resp["Status"]["State"].should.equal("WAITING") assert resp["Status"]["State"] == "WAITING"
resp["StepConcurrencyLevel"].should.equal(2) assert resp["StepConcurrencyLevel"] == 2
resp = client.modify_cluster(ClusterId=cluster_id, StepConcurrencyLevel=4) resp = client.modify_cluster(ClusterId=cluster_id, StepConcurrencyLevel=4)
resp["StepConcurrencyLevel"].should.equal(4) assert resp["StepConcurrencyLevel"] == 4
resp = client.describe_cluster(ClusterId=cluster_id)["Cluster"] resp = client.describe_cluster(ClusterId=cluster_id)["Cluster"]
resp["StepConcurrencyLevel"].should.equal(4) assert resp["StepConcurrencyLevel"] == 4
@mock_emr @mock_emr
@ -694,14 +699,14 @@ def test_set_termination_protection():
resp = client.run_job_flow(**args) resp = client.run_job_flow(**args)
cluster_id = resp["JobFlowId"] cluster_id = resp["JobFlowId"]
resp = client.describe_cluster(ClusterId=cluster_id) resp = client.describe_cluster(ClusterId=cluster_id)
resp["Cluster"]["TerminationProtected"].should.equal(False) assert resp["Cluster"]["TerminationProtected"] is False
for expected in (True, False): for expected in (True, False):
resp = client.set_termination_protection( resp = client.set_termination_protection(
JobFlowIds=[cluster_id], TerminationProtected=expected JobFlowIds=[cluster_id], TerminationProtected=expected
) )
resp = client.describe_cluster(ClusterId=cluster_id) resp = client.describe_cluster(ClusterId=cluster_id)
resp["Cluster"]["TerminationProtected"].should.equal(expected) assert resp["Cluster"]["TerminationProtected"] == expected
@mock_emr @mock_emr
@ -715,9 +720,10 @@ def test_terminate_protected_job_flow_raises_error():
with pytest.raises(ClientError) as ex: with pytest.raises(ClientError) as ex:
client.terminate_job_flows(JobFlowIds=[cluster_id]) client.terminate_job_flows(JobFlowIds=[cluster_id])
error = ex.value.response["Error"] error = ex.value.response["Error"]
error["Code"].should.equal("ValidationException") assert error["Code"] == "ValidationException"
error["Message"].should.equal( assert (
"Could not shut down one or more job flows since they are termination protected." error["Message"]
== "Could not shut down one or more job flows since they are termination protected."
) )
@ -729,14 +735,14 @@ def test_set_visible_to_all_users():
resp = client.run_job_flow(**args) resp = client.run_job_flow(**args)
cluster_id = resp["JobFlowId"] cluster_id = resp["JobFlowId"]
resp = client.describe_cluster(ClusterId=cluster_id) resp = client.describe_cluster(ClusterId=cluster_id)
resp["Cluster"]["VisibleToAllUsers"].should.equal(False) assert resp["Cluster"]["VisibleToAllUsers"] is False
for expected in (True, False): for expected in (True, False):
resp = client.set_visible_to_all_users( resp = client.set_visible_to_all_users(
JobFlowIds=[cluster_id], VisibleToAllUsers=expected JobFlowIds=[cluster_id], VisibleToAllUsers=expected
) )
resp = client.describe_cluster(ClusterId=cluster_id) resp = client.describe_cluster(ClusterId=cluster_id)
resp["Cluster"]["VisibleToAllUsers"].should.equal(expected) assert resp["Cluster"]["VisibleToAllUsers"] == expected
@mock_emr @mock_emr
@ -746,11 +752,11 @@ def test_terminate_job_flows():
resp = client.run_job_flow(**run_job_flow_args) resp = client.run_job_flow(**run_job_flow_args)
cluster_id = resp["JobFlowId"] cluster_id = resp["JobFlowId"]
resp = client.describe_cluster(ClusterId=cluster_id) resp = client.describe_cluster(ClusterId=cluster_id)
resp["Cluster"]["Status"]["State"].should.equal("WAITING") assert resp["Cluster"]["Status"]["State"] == "WAITING"
resp = client.terminate_job_flows(JobFlowIds=[cluster_id]) resp = client.terminate_job_flows(JobFlowIds=[cluster_id])
resp = client.describe_cluster(ClusterId=cluster_id) resp = client.describe_cluster(ClusterId=cluster_id)
resp["Cluster"]["Status"]["State"].should.equal("TERMINATED") assert resp["Cluster"]["Status"]["State"] == "TERMINATED"
# testing multiple end points for each feature # testing multiple end points for each feature
@ -779,14 +785,14 @@ def test_bootstrap_actions():
cl = client.describe_job_flows(JobFlowIds=[cluster_id])["JobFlows"][0] cl = client.describe_job_flows(JobFlowIds=[cluster_id])["JobFlows"][0]
for x, y in zip(cl["BootstrapActions"], bootstrap_actions): for x, y in zip(cl["BootstrapActions"], bootstrap_actions):
x["BootstrapActionConfig"].should.equal(y) assert x["BootstrapActionConfig"] == y
resp = client.list_bootstrap_actions(ClusterId=cluster_id) resp = client.list_bootstrap_actions(ClusterId=cluster_id)
for x, y in zip(resp["BootstrapActions"], bootstrap_actions): for x, y in zip(resp["BootstrapActions"], bootstrap_actions):
x["Name"].should.equal(y["Name"]) assert x["Name"] == y["Name"]
if "Args" in y["ScriptBootstrapAction"]: if "Args" in y["ScriptBootstrapAction"]:
x["Args"].should.equal(y["ScriptBootstrapAction"]["Args"]) assert x["Args"] == y["ScriptBootstrapAction"]["Args"]
x["ScriptPath"].should.equal(y["ScriptBootstrapAction"]["Path"]) assert x["ScriptPath"] == y["ScriptBootstrapAction"]["Path"]
@mock_emr @mock_emr
@ -798,39 +804,35 @@ def test_instances():
cluster_id = client.run_job_flow(**args)["JobFlowId"] cluster_id = client.run_job_flow(**args)["JobFlowId"]
jf = client.describe_job_flows(JobFlowIds=[cluster_id])["JobFlows"][0] jf = client.describe_job_flows(JobFlowIds=[cluster_id])["JobFlows"][0]
instances = client.list_instances(ClusterId=cluster_id)["Instances"] instances = client.list_instances(ClusterId=cluster_id)["Instances"]
len(instances).should.equal(sum(g["InstanceCount"] for g in input_instance_groups)) assert len(instances) == sum(g["InstanceCount"] for g in input_instance_groups)
for x in instances: for x in instances:
x.should.have.key("InstanceGroupId") assert "InstanceGroupId" in x
instance_group = [ instance_group = [
j j
for j in jf["Instances"]["InstanceGroups"] for j in jf["Instances"]["InstanceGroups"]
if j["InstanceGroupId"] == x["InstanceGroupId"] if j["InstanceGroupId"] == x["InstanceGroupId"]
] ]
len(instance_group).should.equal(1) assert len(instance_group) == 1
y = input_groups[instance_group[0]["Name"]] y = input_groups[instance_group[0]["Name"]]
x.should.have.key("Id") assert "Id" in x
x.should.have.key("Ec2InstanceId") assert "Ec2InstanceId" in x
x.should.have.key("PublicDnsName") assert "PublicDnsName" in x
x.should.have.key("PublicIpAddress") assert "PublicIpAddress" in x
x.should.have.key("PrivateDnsName") assert "PrivateDnsName" in x
x.should.have.key("PrivateIpAddress") assert "PrivateIpAddress" in x
x.should.have.key("InstanceFleetId") assert "InstanceFleetId" in x
x["InstanceType"].should.equal(y["InstanceType"]) assert x["InstanceType"] == y["InstanceType"]
x["Market"].should.equal(y["Market"]) assert x["Market"] == y["Market"]
x["Status"]["Timeline"]["ReadyDateTime"].should.be.a("datetime.datetime") assert isinstance(x["Status"]["Timeline"]["ReadyDateTime"], datetime)
x["Status"]["Timeline"]["CreationDateTime"].should.be.a("datetime.datetime") assert isinstance(x["Status"]["Timeline"]["CreationDateTime"], datetime)
x["Status"]["State"].should.equal("RUNNING") assert x["Status"]["State"] == "RUNNING"
for x in [["MASTER"], ["CORE"], ["TASK"], ["MASTER", "TASK"]]: for x in [["MASTER"], ["CORE"], ["TASK"], ["MASTER", "TASK"]]:
instances = client.list_instances(ClusterId=cluster_id, InstanceGroupTypes=x)[ instances = client.list_instances(ClusterId=cluster_id, InstanceGroupTypes=x)[
"Instances" "Instances"
] ]
len(instances).should.equal( assert len(instances) == sum(
sum( g["InstanceCount"] for g in input_instance_groups if g["InstanceRole"] in x
g["InstanceCount"]
for g in input_instance_groups
if g["InstanceRole"] in x
)
) )
@ -856,33 +858,33 @@ def test_instance_groups():
) )
jf = client.describe_job_flows(JobFlowIds=[cluster_id])["JobFlows"][0] jf = client.describe_job_flows(JobFlowIds=[cluster_id])["JobFlows"][0]
jf["Instances"]["InstanceCount"].should.equal( assert jf["Instances"]["InstanceCount"] == sum(
sum(g["InstanceCount"] for g in input_instance_groups) g["InstanceCount"] for g in input_instance_groups
) )
for x in jf["Instances"]["InstanceGroups"]: for x in jf["Instances"]["InstanceGroups"]:
y = input_groups[x["Name"]] y = input_groups[x["Name"]]
if "BidPrice" in y: if "BidPrice" in y:
x["BidPrice"].should.equal(y["BidPrice"]) assert x["BidPrice"] == y["BidPrice"]
x["CreationDateTime"].should.be.a("datetime.datetime") assert isinstance(x["CreationDateTime"], datetime)
# x['EndDateTime'].should.be.a('datetime.datetime') # assert isinstance(x['EndDateTime'], 'datetime.datetime')
x.should.have.key("InstanceGroupId") assert "InstanceGroupId" in x
x["InstanceRequestCount"].should.equal(y["InstanceCount"]) assert x["InstanceRequestCount"] == y["InstanceCount"]
x["InstanceRole"].should.equal(y["InstanceRole"]) assert x["InstanceRole"] == y["InstanceRole"]
x["InstanceRunningCount"].should.equal(y["InstanceCount"]) assert x["InstanceRunningCount"] == y["InstanceCount"]
x["InstanceType"].should.equal(y["InstanceType"]) assert x["InstanceType"] == y["InstanceType"]
# x['LastStateChangeReason'].should.equal(y['LastStateChangeReason']) # assert x['LastStateChangeReason'] == y['LastStateChangeReason']
x["Market"].should.equal(y["Market"]) assert x["Market"] == y["Market"]
x["Name"].should.equal(y["Name"]) assert x["Name"] == y["Name"]
x["ReadyDateTime"].should.be.a("datetime.datetime") assert isinstance(x["ReadyDateTime"], datetime)
x["StartDateTime"].should.be.a("datetime.datetime") assert isinstance(x["StartDateTime"], datetime)
x["State"].should.equal("RUNNING") assert x["State"] == "RUNNING"
groups = client.list_instance_groups(ClusterId=cluster_id)["InstanceGroups"] groups = client.list_instance_groups(ClusterId=cluster_id)["InstanceGroups"]
for x in groups: for x in groups:
y = deepcopy(input_groups[x["Name"]]) y = deepcopy(input_groups[x["Name"]])
if "BidPrice" in y: if "BidPrice" in y:
x["BidPrice"].should.equal(y["BidPrice"]) assert x["BidPrice"] == y["BidPrice"]
if "AutoScalingPolicy" in y: if "AutoScalingPolicy" in y:
x["AutoScalingPolicy"]["Status"]["State"].should.equal("ATTACHED") assert x["AutoScalingPolicy"]["Status"]["State"] == "ATTACHED"
returned_policy = dict(x["AutoScalingPolicy"]) returned_policy = dict(x["AutoScalingPolicy"])
del returned_policy["Status"] del returned_policy["Status"]
policy = json.loads( policy = json.loads(
@ -890,26 +892,26 @@ def test_instance_groups():
"${emr.clusterId}", cluster_id "${emr.clusterId}", cluster_id
) )
) )
returned_policy.should.equal(policy) assert returned_policy == policy
if "EbsConfiguration" in y: if "EbsConfiguration" in y:
_do_assertion_ebs_configuration(x, y) _do_assertion_ebs_configuration(x, y)
# Configurations # Configurations
# EbsBlockDevices # EbsBlockDevices
# EbsOptimized # EbsOptimized
x.should.have.key("Id") assert "Id" in x
x["InstanceGroupType"].should.equal(y["InstanceRole"]) assert x["InstanceGroupType"] == y["InstanceRole"]
x["InstanceType"].should.equal(y["InstanceType"]) assert x["InstanceType"] == y["InstanceType"]
x["Market"].should.equal(y["Market"]) assert x["Market"] == y["Market"]
x["Name"].should.equal(y["Name"]) assert x["Name"] == y["Name"]
x["RequestedInstanceCount"].should.equal(y["InstanceCount"]) assert x["RequestedInstanceCount"] == y["InstanceCount"]
x["RunningInstanceCount"].should.equal(y["InstanceCount"]) assert x["RunningInstanceCount"] == y["InstanceCount"]
# ShrinkPolicy # ShrinkPolicy
x["Status"]["State"].should.equal("RUNNING") assert x["Status"]["State"] == "RUNNING"
x["Status"]["StateChangeReason"]["Code"].should.be.a(str) assert isinstance(x["Status"]["StateChangeReason"]["Code"], str)
# x['Status']['StateChangeReason']['Message'].should.be.a(str) # assert isinstance(x['Status']['StateChangeReason']['Message'], str)
x["Status"]["Timeline"]["CreationDateTime"].should.be.a("datetime.datetime") assert isinstance(x["Status"]["Timeline"]["CreationDateTime"], datetime)
# x['Status']['Timeline']['EndDateTime'].should.be.a('datetime.datetime') # assert isinstance(x['Status']['Timeline']['EndDateTime'], 'datetime.datetime')
x["Status"]["Timeline"]["ReadyDateTime"].should.be.a("datetime.datetime") assert isinstance(x["Status"]["Timeline"]["ReadyDateTime"], datetime)
igs = dict((g["Name"], g) for g in groups) igs = dict((g["Name"], g) for g in groups)
client.modify_instance_groups( client.modify_instance_groups(
@ -919,10 +921,10 @@ def test_instance_groups():
] ]
) )
jf = client.describe_job_flows(JobFlowIds=[cluster_id])["JobFlows"][0] jf = client.describe_job_flows(JobFlowIds=[cluster_id])["JobFlows"][0]
jf["Instances"]["InstanceCount"].should.equal(base_instance_count + 5) assert jf["Instances"]["InstanceCount"] == base_instance_count + 5
igs = dict((g["Name"], g) for g in jf["Instances"]["InstanceGroups"]) igs = dict((g["Name"], g) for g in jf["Instances"]["InstanceGroups"])
igs["task-1"]["InstanceRunningCount"].should.equal(2) assert igs["task-1"]["InstanceRunningCount"] == 2
igs["task-2"]["InstanceRunningCount"].should.equal(3) assert igs["task-2"]["InstanceRunningCount"] == 3
@mock_emr @mock_emr
@ -983,82 +985,87 @@ def test_steps():
cluster_id = client.run_job_flow(**args)["JobFlowId"] cluster_id = client.run_job_flow(**args)["JobFlowId"]
jf = client.describe_job_flows(JobFlowIds=[cluster_id])["JobFlows"][0] jf = client.describe_job_flows(JobFlowIds=[cluster_id])["JobFlows"][0]
jf["Steps"].should.have.length_of(1) assert len(jf["Steps"]) == 1
client.add_job_flow_steps(JobFlowId=cluster_id, Steps=[input_steps[1]]) client.add_job_flow_steps(JobFlowId=cluster_id, Steps=[input_steps[1]])
jf = client.describe_job_flows(JobFlowIds=[cluster_id])["JobFlows"][0] jf = client.describe_job_flows(JobFlowIds=[cluster_id])["JobFlows"][0]
jf["Steps"].should.have.length_of(2) assert len(jf["Steps"]) == 2
for idx, (x, y) in enumerate(zip(jf["Steps"], input_steps)): for idx, (x, y) in enumerate(zip(jf["Steps"], input_steps)):
x["ExecutionStatusDetail"].should.have.key("CreationDateTime") assert "CreationDateTime" in x["ExecutionStatusDetail"]
# x['ExecutionStatusDetail'].should.have.key('EndDateTime') # assert 'EndDateTime' in x['ExecutionStatusDetail']
# x['ExecutionStatusDetail'].should.have.key('LastStateChangeReason') # assert 'LastStateChangeReason' in x['ExecutionStatusDetail']
# x['ExecutionStatusDetail'].should.have.key('StartDateTime') # assert 'StartDateTime' in x['ExecutionStatusDetail']
x["ExecutionStatusDetail"]["State"].should.equal( assert (
"RUNNING" if idx == 0 else "PENDING" x["ExecutionStatusDetail"]["State"] == "RUNNING" if idx == 0 else "PENDING"
) )
x["StepConfig"]["ActionOnFailure"].should.equal("TERMINATE_CLUSTER") assert x["StepConfig"]["ActionOnFailure"] == "TERMINATE_CLUSTER"
x["StepConfig"]["HadoopJarStep"]["Args"].should.equal( assert x["StepConfig"]["HadoopJarStep"]["Args"] == y["HadoopJarStep"]["Args"]
y["HadoopJarStep"]["Args"] assert x["StepConfig"]["HadoopJarStep"]["Jar"] == y["HadoopJarStep"]["Jar"]
)
x["StepConfig"]["HadoopJarStep"]["Jar"].should.equal(y["HadoopJarStep"]["Jar"])
if "MainClass" in y["HadoopJarStep"]: if "MainClass" in y["HadoopJarStep"]:
x["StepConfig"]["HadoopJarStep"]["MainClass"].should.equal( assert (
y["HadoopJarStep"]["MainClass"] x["StepConfig"]["HadoopJarStep"]["MainClass"]
== y["HadoopJarStep"]["MainClass"]
) )
if "Properties" in y["HadoopJarStep"]: if "Properties" in y["HadoopJarStep"]:
x["StepConfig"]["HadoopJarStep"]["Properties"].should.equal( assert (
y["HadoopJarStep"]["Properties"] x["StepConfig"]["HadoopJarStep"]["Properties"]
== y["HadoopJarStep"]["Properties"]
) )
x["StepConfig"]["Name"].should.equal(y["Name"]) assert x["StepConfig"]["Name"] == y["Name"]
expected = dict((s["Name"], s) for s in input_steps) expected = dict((s["Name"], s) for s in input_steps)
steps = client.list_steps(ClusterId=cluster_id)["Steps"] steps = client.list_steps(ClusterId=cluster_id)["Steps"]
steps.should.have.length_of(2) assert len(steps) == 2
# Steps should be returned in reverse order. # Steps should be returned in reverse order.
assert (
sorted( sorted(
steps, key=lambda o: o["Status"]["Timeline"]["CreationDateTime"], reverse=True steps,
).should.equal(steps) key=lambda o: o["Status"]["Timeline"]["CreationDateTime"],
reverse=True,
)
== steps
)
for x in steps: for x in steps:
y = expected[x["Name"]] y = expected[x["Name"]]
x["ActionOnFailure"].should.equal("TERMINATE_CLUSTER") assert x["ActionOnFailure"] == "TERMINATE_CLUSTER"
x["Config"]["Args"].should.equal(y["HadoopJarStep"]["Args"]) assert x["Config"]["Args"] == y["HadoopJarStep"]["Args"]
x["Config"]["Jar"].should.equal(y["HadoopJarStep"]["Jar"]) assert x["Config"]["Jar"] == y["HadoopJarStep"]["Jar"]
# x['Config']['MainClass'].should.equal(y['HadoopJarStep']['MainClass']) # assert x['Config']['MainClass'] == y['HadoopJarStep']['MainClass']
# Properties # Properties
x["Id"].should.be.a(str) assert isinstance(x["Id"], str)
x["Name"].should.equal(y["Name"]) assert x["Name"] == y["Name"]
x["Status"]["State"].should.be.within(["RUNNING", "PENDING"]) assert x["Status"]["State"] in ["RUNNING", "PENDING"]
# StateChangeReason # StateChangeReason
x["Status"]["Timeline"]["CreationDateTime"].should.be.a("datetime.datetime") assert isinstance(x["Status"]["Timeline"]["CreationDateTime"], datetime)
# x['Status']['Timeline']['EndDateTime'].should.be.a('datetime.datetime') # assert isinstance(x['Status']['Timeline']['EndDateTime'], 'datetime.datetime')
# Only the first step will have started - we don't know anything about when it finishes, so the second step never starts # Only the first step will have started - we don't know anything about when it finishes, so the second step never starts
if x["Name"] == "My wordcount example": if x["Name"] == "My wordcount example":
x["Status"]["Timeline"]["StartDateTime"].should.be.a("datetime.datetime") assert isinstance(x["Status"]["Timeline"]["StartDateTime"], datetime)
x = client.describe_step(ClusterId=cluster_id, StepId=x["Id"])["Step"] x = client.describe_step(ClusterId=cluster_id, StepId=x["Id"])["Step"]
x["ActionOnFailure"].should.equal("TERMINATE_CLUSTER") assert x["ActionOnFailure"] == "TERMINATE_CLUSTER"
x["Config"]["Args"].should.equal(y["HadoopJarStep"]["Args"]) assert x["Config"]["Args"] == y["HadoopJarStep"]["Args"]
x["Config"]["Jar"].should.equal(y["HadoopJarStep"]["Jar"]) assert x["Config"]["Jar"] == y["HadoopJarStep"]["Jar"]
# x['Config']['MainClass'].should.equal(y['HadoopJarStep']['MainClass']) # assert x['Config']['MainClass'] == y['HadoopJarStep']['MainClass']
# Properties # Properties
x["Id"].should.be.a(str) assert isinstance(x["Id"], str)
x["Name"].should.equal(y["Name"]) assert x["Name"] == y["Name"]
x["Status"]["State"].should.be.within(["RUNNING", "PENDING"]) assert x["Status"]["State"] in ["RUNNING", "PENDING"]
# StateChangeReason # StateChangeReason
x["Status"]["Timeline"]["CreationDateTime"].should.be.a("datetime.datetime") assert isinstance(x["Status"]["Timeline"]["CreationDateTime"], datetime)
# x['Status']['Timeline']['EndDateTime'].should.be.a('datetime.datetime') # assert isinstance(x['Status']['Timeline']['EndDateTime'], 'datetime.datetime')
# x['Status']['Timeline']['StartDateTime'].should.be.a('datetime.datetime') # assert isinstance(x['Status']['Timeline']['StartDateTime'], 'datetime.datetime')
step_id = steps[-1]["Id"] # Last step is first created step. step_id = steps[-1]["Id"] # Last step is first created step.
steps = client.list_steps(ClusterId=cluster_id, StepIds=[step_id])["Steps"] steps = client.list_steps(ClusterId=cluster_id, StepIds=[step_id])["Steps"]
steps.should.have.length_of(1) assert len(steps) == 1
steps[0]["Id"].should.equal(step_id) assert steps[0]["Id"] == step_id
steps = client.list_steps(ClusterId=cluster_id, StepStates=["RUNNING"])["Steps"] steps = client.list_steps(ClusterId=cluster_id, StepStates=["RUNNING"])["Steps"]
steps.should.have.length_of(1) assert len(steps) == 1
steps[0]["Id"].should.equal(step_id) assert steps[0]["Id"] == step_id
@mock_emr @mock_emr
@ -1073,14 +1080,14 @@ def test_tags():
client.add_tags(ResourceId=cluster_id, Tags=input_tags) client.add_tags(ResourceId=cluster_id, Tags=input_tags)
resp = client.describe_cluster(ClusterId=cluster_id)["Cluster"] resp = client.describe_cluster(ClusterId=cluster_id)["Cluster"]
resp["Tags"].should.have.length_of(2) assert len(resp["Tags"]) == 2
dict((t["Key"], t["Value"]) for t in resp["Tags"]).should.equal( assert {t["Key"]: t["Value"] for t in resp["Tags"]} == {
dict((t["Key"], t["Value"]) for t in input_tags) t["Key"]: t["Value"] for t in input_tags
) }
client.remove_tags(ResourceId=cluster_id, TagKeys=[t["Key"] for t in input_tags]) client.remove_tags(ResourceId=cluster_id, TagKeys=[t["Key"] for t in input_tags])
resp = client.describe_cluster(ClusterId=cluster_id)["Cluster"] resp = client.describe_cluster(ClusterId=cluster_id)["Cluster"]
resp["Tags"].should.equal([]) assert resp["Tags"] == []
@mock_emr @mock_emr
@ -1108,28 +1115,32 @@ def test_security_configurations():
Name=security_configuration_name, SecurityConfiguration=security_configuration Name=security_configuration_name, SecurityConfiguration=security_configuration
) )
resp["Name"].should.equal(security_configuration_name) assert resp["Name"] == security_configuration_name
resp["CreationDateTime"].should.be.a("datetime.datetime") assert isinstance(resp["CreationDateTime"], datetime)
resp = client.describe_security_configuration(Name=security_configuration_name) resp = client.describe_security_configuration(Name=security_configuration_name)
resp["Name"].should.equal(security_configuration_name) assert resp["Name"] == security_configuration_name
resp["SecurityConfiguration"].should.equal(security_configuration) assert resp["SecurityConfiguration"] == security_configuration
resp["CreationDateTime"].should.be.a("datetime.datetime") assert isinstance(resp["CreationDateTime"], datetime)
client.delete_security_configuration(Name=security_configuration_name) client.delete_security_configuration(Name=security_configuration_name)
with pytest.raises(ClientError) as ex: with pytest.raises(ClientError) as ex:
client.describe_security_configuration(Name=security_configuration_name) client.describe_security_configuration(Name=security_configuration_name)
ex.value.response["Error"]["Code"].should.equal("InvalidRequestException") err = ex.value.response["Error"]
ex.value.response["Error"]["Message"].should.match( assert err["Code"] == "InvalidRequestException"
r"Security configuration with name .* does not exist." assert (
err["Message"]
== "Security configuration with name 'MySecurityConfiguration' does not exist."
) )
with pytest.raises(ClientError) as ex: with pytest.raises(ClientError) as ex:
client.delete_security_configuration(Name=security_configuration_name) client.delete_security_configuration(Name=security_configuration_name)
ex.value.response["Error"]["Code"].should.equal("InvalidRequestException") err = ex.value.response["Error"]
ex.value.response["Error"]["Message"].should.match( assert err["Code"] == "InvalidRequestException"
r"Security configuration with name .* does not exist." assert (
err["Message"]
== "Security configuration with name 'MySecurityConfiguration' does not exist."
) )
@ -1153,9 +1164,10 @@ def test_run_job_flow_with_invalid_number_of_master_nodes_raises_error():
with pytest.raises(ClientError) as ex: with pytest.raises(ClientError) as ex:
client.run_job_flow(**params) client.run_job_flow(**params)
error = ex.value.response["Error"] error = ex.value.response["Error"]
error["Code"].should.equal("ValidationException") assert error["Code"] == "ValidationException"
error["Message"].should.equal( assert (
"Master instance group must have exactly 3 instances for HA clusters." error["Message"]
== "Master instance group must have exactly 3 instances for HA clusters."
) )
@ -1180,11 +1192,11 @@ def test_run_job_flow_with_multiple_master_nodes():
) )
cluster_id = client.run_job_flow(**params)["JobFlowId"] cluster_id = client.run_job_flow(**params)["JobFlowId"]
cluster = client.describe_cluster(ClusterId=cluster_id)["Cluster"] cluster = client.describe_cluster(ClusterId=cluster_id)["Cluster"]
cluster["AutoTerminate"].should.equal(False) assert cluster["AutoTerminate"] is False
cluster["TerminationProtected"].should.equal(True) assert cluster["TerminationProtected"] is True
groups = client.list_instance_groups(ClusterId=cluster_id)["InstanceGroups"] groups = client.list_instance_groups(ClusterId=cluster_id)["InstanceGroups"]
master_instance_group = next( master_instance_group = next(
group for group in groups if group["InstanceGroupType"] == "MASTER" group for group in groups if group["InstanceGroupType"] == "MASTER"
) )
master_instance_group["RequestedInstanceCount"].should.equal(3) assert master_instance_group["RequestedInstanceCount"] == 3
master_instance_group["RunningInstanceCount"].should.equal(3) assert master_instance_group["RunningInstanceCount"] == 3

View File

@ -1,6 +1,5 @@
import boto3 import boto3
import pytest import pytest
import sure # noqa # pylint: disable=unused-import
from moto import settings from moto import settings
from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID

View File

@ -1,5 +1,3 @@
import sure # noqa # pylint: disable=unused-import
import moto.server as server import moto.server as server
""" """
@ -13,5 +11,5 @@ def test_describe_jobflows():
res = test_client.get("/?Action=DescribeJobFlows") res = test_client.get("/?Action=DescribeJobFlows")
res.data.should.contain(b"<DescribeJobFlowsResult>") assert b"<DescribeJobFlowsResult>" in res.data
res.data.should.contain(b"<JobFlows>") assert b"<JobFlows>" in res.data