Techdebt: Replace sure with regular assertions in ECS (#6533)

This commit is contained in:
Bert Blommers 2023-07-18 15:33:16 +00:00 committed by GitHub
parent e3727fd40e
commit 9da27b76ef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 1060 additions and 1129 deletions

View File

@ -573,9 +573,7 @@ def test_modify_snapshot_attribute():
], "This snapshot should have public group permissions." ], "This snapshot should have public group permissions."
# Add is idempotent # Add is idempotent
ec2_client.modify_snapshot_attribute.when.called_with( ec2_client.modify_snapshot_attribute(**ADD_GROUP_ARGS)
**ADD_GROUP_ARGS
).should_not.throw(ClientError)
assert attributes["CreateVolumePermissions"] == [ assert attributes["CreateVolumePermissions"] == [
{"Group": "all"} {"Group": "all"}
], "This snapshot should have public group permissions." ], "This snapshot should have public group permissions."
@ -599,9 +597,7 @@ def test_modify_snapshot_attribute():
], "This snapshot should have no permissions." ], "This snapshot should have no permissions."
# Remove is idempotent # Remove is idempotent
ec2_client.modify_snapshot_attribute.when.called_with( ec2_client.modify_snapshot_attribute(**REMOVE_GROUP_ARGS)
**REMOVE_GROUP_ARGS
).should_not.throw(ClientError)
assert not attributes[ assert not attributes[
"CreateVolumePermissions" "CreateVolumePermissions"
], "This snapshot should have no permissions." ], "This snapshot should have no permissions."
@ -920,10 +916,10 @@ def test_kms_key_id_property_hidden_when_volume_not_encrypted():
client = boto3.client("ec2", region_name="us-east-1") client = boto3.client("ec2", region_name="us-east-1")
resp = client.create_volume(AvailabilityZone="us-east-1a", Encrypted=False, Size=10) resp = client.create_volume(AvailabilityZone="us-east-1a", Encrypted=False, Size=10)
assert resp["Encrypted"] is False assert resp["Encrypted"] is False
resp.should_not.have.key("KmsKeyId") assert "KmsKeyId" not in resp
resp = client.describe_volumes(VolumeIds=[resp["VolumeId"]]) resp = client.describe_volumes(VolumeIds=[resp["VolumeId"]])
assert resp["Volumes"][0]["Encrypted"] is False assert resp["Volumes"][0]["Encrypted"] is False
resp["Volumes"][0].should_not.have.key("KmsKeyId") assert "KmsKeyId" not in resp["Volumes"][0]
resource = boto3.resource("ec2", region_name="us-east-1") resource = boto3.resource("ec2", region_name="us-east-1")
volume = resource.create_volume( volume = resource.create_volume(
AvailabilityZone="us-east-1a", Encrypted=False, Size=10 AvailabilityZone="us-east-1a", Encrypted=False, Size=10

View File

@ -252,9 +252,9 @@ def test_eip_vpc_association():
) )
instance.load() instance.load()
address.reload() address.reload()
address.association_id.should_not.equal(None) assert address.association_id is not None
instance.public_ip_address.should_not.equal(None) assert instance.public_ip_address is not None
instance.public_dns_name.should_not.equal(None) assert instance.public_dns_name is not None
assert address.network_interface_id == instance.network_interfaces_attribute[0].get( assert address.network_interface_id == instance.network_interfaces_attribute[0].get(
"NetworkInterfaceId" "NetworkInterfaceId"
) )

View File

@ -13,7 +13,7 @@ def test_console_output():
instances = conn.create_instances(ImageId=EXAMPLE_AMI_ID, MinCount=1, MaxCount=1) instances = conn.create_instances(ImageId=EXAMPLE_AMI_ID, MinCount=1, MaxCount=1)
output = instances[0].console_output() output = instances[0].console_output()
output.get("Output").should_not.equal(None) assert output.get("Output") is not None
@mock_ec2 @mock_ec2

View File

@ -100,7 +100,7 @@ def test_describe_instance_types_filter_by_vcpus():
assert "t2.nano" in types assert "t2.nano" in types
# not contain # not contain
types.should_not.contain("m5d.xlarge") assert "m5d.xlarge" not in types
@mock_ec2 @mock_ec2
@ -117,7 +117,7 @@ def test_describe_instance_types_filter_by_memory():
assert "t4g.nano" in types assert "t4g.nano" in types
# not contain # not contain
types.should_not.contain("m5d.xlarge") assert "m5d.xlarge" not in types
@mock_ec2 @mock_ec2
@ -134,7 +134,7 @@ def test_describe_instance_types_filter_by_bare_metal():
assert "a1.metal" in types assert "a1.metal" in types
# not contain # not contain
types.should_not.contain("t1.micro") assert "t1.micro" not in types
@mock_ec2 @mock_ec2
@ -151,7 +151,7 @@ def test_describe_instance_types_filter_by_burstable_performance_supported():
assert "t2.micro" in types assert "t2.micro" in types
# not contain # not contain
types.should_not.contain("t1.micro") assert "t1.micro" not in types
@mock_ec2 @mock_ec2
@ -168,7 +168,7 @@ def test_describe_instance_types_filter_by_current_generation():
assert "t2.micro" in types assert "t2.micro" in types
# not contain # not contain
types.should_not.contain("t1.micro") assert "t1.micro" not in types
@mock_ec2 @mock_ec2

View File

@ -176,9 +176,9 @@ def test_delete_network_acl():
updated_network_acls = client.describe_network_acls()["NetworkAcls"] updated_network_acls = client.describe_network_acls()["NetworkAcls"]
any( assert not any(
acl["NetworkAclId"] == network_acl.id for acl in updated_network_acls acl["NetworkAclId"] == network_acl.id for acl in updated_network_acls
).shouldnt.be.ok )
@mock_ec2 @mock_ec2

View File

@ -96,7 +96,7 @@ def test_describe_managed_prefix_lists_with_tags():
Filters=[{"Name": "tag:key", "Values": ["value"]}] Filters=[{"Name": "tag:key", "Values": ["value"]}]
)["PrefixLists"] )["PrefixLists"]
assert tagged_pl_id in [pl["PrefixListId"] for pl in tagged_lists] assert tagged_pl_id in [pl["PrefixListId"] for pl in tagged_lists]
[pl["PrefixListId"] for pl in tagged_lists].should_not.contain(untagged_pl_id) assert untagged_pl_id not in [pl["PrefixListId"] for pl in tagged_lists]
@mock_ec2 @mock_ec2

View File

@ -1114,7 +1114,7 @@ def test_associate_route_table_by_gateway():
assert verify[0]["Associations"][0]["Main"] is False assert verify[0]["Associations"][0]["Main"] is False
assert verify[0]["Associations"][0]["GatewayId"] == igw_id assert verify[0]["Associations"][0]["GatewayId"] == igw_id
assert verify[0]["Associations"][0]["RouteTableAssociationId"] == assoc_id assert verify[0]["Associations"][0]["RouteTableAssociationId"] == assoc_id
verify[0]["Associations"][0].doesnt.have.key("SubnetId") assert "SubnetId" not in verify[0]["Associations"][0]
@mock_ec2 @mock_ec2
@ -1139,7 +1139,7 @@ def test_associate_route_table_by_subnet():
assert verify[0]["Associations"][0]["Main"] is False assert verify[0]["Associations"][0]["Main"] is False
assert verify[0]["Associations"][0]["SubnetId"] == subnet_id assert verify[0]["Associations"][0]["SubnetId"] == subnet_id
assert verify[0]["Associations"][0]["RouteTableAssociationId"] == assoc_id assert verify[0]["Associations"][0]["RouteTableAssociationId"] == assoc_id
verify[0]["Associations"][0].doesnt.have.key("GatewayId") assert "GatewayId" not in verify[0]["Associations"][0]
def setup_vpc(): def setup_vpc():

View File

@ -1,13 +1,11 @@
from botocore.exceptions import ClientError
import boto3 import boto3
import sure # noqa # pylint: disable=unused-import
import json import json
import pytest
from botocore.exceptions import ClientError
from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID
from moto.ec2 import utils as ec2_utils from moto.ec2 import utils as ec2_utils
from moto import mock_ecs, mock_ec2 from moto import mock_ecs, mock_ec2
import pytest
from tests import EXAMPLE_AMI_ID from tests import EXAMPLE_AMI_ID
@ -16,7 +14,7 @@ def test_list_account_settings_initial():
client = boto3.client("ecs", region_name="eu-west-1") client = boto3.client("ecs", region_name="eu-west-1")
resp = client.list_account_settings() resp = client.list_account_settings()
resp.should.have.key("settings").equal([]) assert resp["settings"] == []
@mock_ecs @mock_ecs
@ -29,8 +27,7 @@ def test_put_account_setting(name, value):
client = boto3.client("ecs", region_name="eu-west-1") client = boto3.client("ecs", region_name="eu-west-1")
resp = client.put_account_setting(name=name, value=value) resp = client.put_account_setting(name=name, value=value)
resp.should.have.key("setting") assert resp["setting"] == {"name": name, "value": value}
resp["setting"].should.equal({"name": name, "value": value})
@mock_ecs @mock_ecs
@ -42,27 +39,23 @@ def test_list_account_setting():
client.put_account_setting(name="taskLongArnFormat", value="enabled") client.put_account_setting(name="taskLongArnFormat", value="enabled")
resp = client.list_account_settings() resp = client.list_account_settings()
resp.should.have.key("settings").length_of(3) assert len(resp["settings"]) == 3
resp["settings"].should.contain( assert {"name": "containerInstanceLongArnFormat", "value": "enabled"} in resp[
{"name": "containerInstanceLongArnFormat", "value": "enabled"} "settings"
) ]
resp["settings"].should.contain( assert {"name": "serviceLongArnFormat", "value": "disabled"} in resp["settings"]
{"name": "serviceLongArnFormat", "value": "disabled"} assert {"name": "taskLongArnFormat", "value": "enabled"} in resp["settings"]
)
resp["settings"].should.contain({"name": "taskLongArnFormat", "value": "enabled"})
resp = client.list_account_settings(name="serviceLongArnFormat") resp = client.list_account_settings(name="serviceLongArnFormat")
resp.should.have.key("settings").length_of(1) assert len(resp["settings"]) == 1
resp["settings"].should.contain( assert {"name": "serviceLongArnFormat", "value": "disabled"} in resp["settings"]
{"name": "serviceLongArnFormat", "value": "disabled"}
)
resp = client.list_account_settings(value="enabled") resp = client.list_account_settings(value="enabled")
resp.should.have.key("settings").length_of(2) assert len(resp["settings"]) == 2
resp["settings"].should.contain( assert {"name": "containerInstanceLongArnFormat", "value": "enabled"} in resp[
{"name": "containerInstanceLongArnFormat", "value": "enabled"} "settings"
) ]
resp["settings"].should.contain({"name": "taskLongArnFormat", "value": "enabled"}) assert {"name": "taskLongArnFormat", "value": "enabled"} in resp["settings"]
@mock_ecs @mock_ecs
@ -72,9 +65,10 @@ def test_list_account_settings_wrong_name():
with pytest.raises(ClientError) as exc: with pytest.raises(ClientError) as exc:
client.list_account_settings(name="unknown") client.list_account_settings(name="unknown")
err = exc.value.response["Error"] err = exc.value.response["Error"]
err["Code"].should.equal("InvalidParameterException") assert err["Code"] == "InvalidParameterException"
err["Message"].should.equal( assert (
"unknown should be one of [serviceLongArnFormat,taskLongArnFormat,containerInstanceLongArnFormat,containerLongArnFormat,awsvpcTrunking,containerInsights,dualStackIPv6]" err["Message"]
== "unknown should be one of [serviceLongArnFormat,taskLongArnFormat,containerInstanceLongArnFormat,containerLongArnFormat,awsvpcTrunking,containerInsights,dualStackIPv6]"
) )
@ -87,16 +81,16 @@ def test_delete_account_setting():
client.put_account_setting(name="taskLongArnFormat", value="enabled") client.put_account_setting(name="taskLongArnFormat", value="enabled")
resp = client.list_account_settings() resp = client.list_account_settings()
resp.should.have.key("settings").length_of(3) assert len(resp["settings"]) == 3
client.delete_account_setting(name="serviceLongArnFormat") client.delete_account_setting(name="serviceLongArnFormat")
resp = client.list_account_settings() resp = client.list_account_settings()
resp.should.have.key("settings").length_of(2) assert len(resp["settings"]) == 2
resp["settings"].should.contain( assert {"name": "containerInstanceLongArnFormat", "value": "enabled"} in resp[
{"name": "containerInstanceLongArnFormat", "value": "enabled"} "settings"
) ]
resp["settings"].should.contain({"name": "taskLongArnFormat", "value": "enabled"}) assert {"name": "taskLongArnFormat", "value": "enabled"} in resp["settings"]
@mock_ec2 @mock_ec2
@ -129,16 +123,15 @@ def test_put_account_setting_changes_service_arn():
# Initial response is short (setting serviceLongArnFormat=disabled) # Initial response is short (setting serviceLongArnFormat=disabled)
response = client.list_services(cluster="dummy-cluster", launchType="FARGATE") response = client.list_services(cluster="dummy-cluster", launchType="FARGATE")
service_arn = response["serviceArns"][0] service_arn = response["serviceArns"][0]
service_arn.should.equal( assert service_arn == f"arn:aws:ecs:eu-west-1:{ACCOUNT_ID}:service/test-ecs-service"
f"arn:aws:ecs:eu-west-1:{ACCOUNT_ID}:service/test-ecs-service"
)
# Second invocation returns long ARN's by default, after deleting the preference # Second invocation returns long ARN's by default, after deleting the preference
client.delete_account_setting(name="serviceLongArnFormat") client.delete_account_setting(name="serviceLongArnFormat")
response = client.list_services(cluster="dummy-cluster", launchType="FARGATE") response = client.list_services(cluster="dummy-cluster", launchType="FARGATE")
service_arn = response["serviceArns"][0] service_arn = response["serviceArns"][0]
service_arn.should.equal( assert (
f"arn:aws:ecs:eu-west-1:{ACCOUNT_ID}:service/dummy-cluster/test-ecs-service" service_arn
== f"arn:aws:ecs:eu-west-1:{ACCOUNT_ID}:service/dummy-cluster/test-ecs-service"
) )
@ -165,8 +158,8 @@ def test_put_account_setting_changes_containerinstance_arn():
cluster=test_cluster_name, instanceIdentityDocument=instance_id_document cluster=test_cluster_name, instanceIdentityDocument=instance_id_document
) )
full_arn = response["containerInstance"]["containerInstanceArn"] full_arn = response["containerInstance"]["containerInstanceArn"]
full_arn.should.match( assert full_arn.startswith(
f"arn:aws:ecs:us-east-1:{ACCOUNT_ID}:container-instance/{test_cluster_name}/[a-z0-9-]+$" f"arn:aws:ecs:us-east-1:{ACCOUNT_ID}:container-instance/{test_cluster_name}/"
) )
# Now disable long-format # Now disable long-format
@ -177,8 +170,8 @@ def test_put_account_setting_changes_containerinstance_arn():
cluster=test_cluster_name, instanceIdentityDocument=instance_id_document cluster=test_cluster_name, instanceIdentityDocument=instance_id_document
) )
full_arn = response["containerInstance"]["containerInstanceArn"] full_arn = response["containerInstance"]["containerInstanceArn"]
full_arn.should.match( assert full_arn.startswith(
f"arn:aws:ecs:us-east-1:{ACCOUNT_ID}:container-instance/[a-z0-9-]+$" f"arn:aws:ecs:us-east-1:{ACCOUNT_ID}:container-instance/"
) )
@ -224,8 +217,8 @@ def test_run_task_default_cluster_new_arn_format():
count=1, count=1,
startedBy="moto", startedBy="moto",
) )
response["tasks"][0]["taskArn"].should.match( assert response["tasks"][0]["taskArn"].startswith(
f"arn:aws:ecs:us-east-1:{ACCOUNT_ID}:task/{test_cluster_name}/[a-z0-9-]+$" f"arn:aws:ecs:us-east-1:{ACCOUNT_ID}:task/{test_cluster_name}/"
) )
# Enable short-format for the next task # Enable short-format for the next task
@ -237,6 +230,6 @@ def test_run_task_default_cluster_new_arn_format():
count=1, count=1,
startedBy="moto", startedBy="moto",
) )
response["tasks"][0]["taskArn"].should.match( assert response["tasks"][0]["taskArn"].startswith(
f"arn:aws:ecs:us-east-1:{ACCOUNT_ID}:task/[a-z0-9-]+$" f"arn:aws:ecs:us-east-1:{ACCOUNT_ID}:task/"
) )

File diff suppressed because it is too large Load Diff

View File

@ -19,25 +19,22 @@ def test_create_capacity_provider():
"managedTerminationProtection": "DISABLED", "managedTerminationProtection": "DISABLED",
}, },
) )
resp.should.have.key("capacityProvider")
provider = resp["capacityProvider"] provider = resp["capacityProvider"]
provider.should.have.key("capacityProviderArn") assert "capacityProviderArn" in provider
provider.should.have.key("name").equals("my_provider") assert provider["name"] == "my_provider"
provider.should.have.key("status").equals("ACTIVE") assert provider["status"] == "ACTIVE"
provider.should.have.key("autoScalingGroupProvider").equals( assert provider["autoScalingGroupProvider"] == {
{ "autoScalingGroupArn": "asg:arn",
"autoScalingGroupArn": "asg:arn", "managedScaling": {
"managedScaling": { "instanceWarmupPeriod": 300,
"instanceWarmupPeriod": 300, "status": "ENABLED",
"status": "ENABLED", "targetCapacity": 5,
"targetCapacity": 5, "minimumScalingStepSize": 1,
"minimumScalingStepSize": 1, "maximumScalingStepSize": 2,
"maximumScalingStepSize": 2, },
}, "managedTerminationProtection": "DISABLED",
"managedTerminationProtection": "DISABLED", }
}
)
@mock_ecs @mock_ecs
@ -48,26 +45,25 @@ def test_create_capacity_provider_with_tags():
autoScalingGroupProvider={"autoScalingGroupArn": "asg:arn"}, autoScalingGroupProvider={"autoScalingGroupArn": "asg:arn"},
tags=[{"key": "k1", "value": "v1"}], tags=[{"key": "k1", "value": "v1"}],
) )
resp.should.have.key("capacityProvider")
provider = resp["capacityProvider"] provider = resp["capacityProvider"]
provider.should.have.key("capacityProviderArn") assert "capacityProviderArn" in provider
provider.should.have.key("name").equals("my_provider") assert provider["name"] == "my_provider"
provider.should.have.key("tags").equals([{"key": "k1", "value": "v1"}]) assert provider["tags"] == [{"key": "k1", "value": "v1"}]
client.tag_resource( client.tag_resource(
resourceArn=provider["capacityProviderArn"], tags=[{"key": "k2", "value": "v2"}] resourceArn=provider["capacityProviderArn"], tags=[{"key": "k2", "value": "v2"}]
) )
resp = client.list_tags_for_resource(resourceArn=provider["capacityProviderArn"]) resp = client.list_tags_for_resource(resourceArn=provider["capacityProviderArn"])
resp["tags"].should.have.length_of(2) assert len(resp["tags"]) == 2
resp["tags"].should.contain({"key": "k1", "value": "v1"}) assert {"key": "k1", "value": "v1"} in resp["tags"]
resp["tags"].should.contain({"key": "k2", "value": "v2"}) assert {"key": "k2", "value": "v2"} in resp["tags"]
client.untag_resource(resourceArn=provider["capacityProviderArn"], tagKeys=["k1"]) client.untag_resource(resourceArn=provider["capacityProviderArn"], tagKeys=["k1"])
resp = client.list_tags_for_resource(resourceArn=provider["capacityProviderArn"]) resp = client.list_tags_for_resource(resourceArn=provider["capacityProviderArn"])
resp["tags"].should.equal([{"key": "k2", "value": "v2"}]) assert resp["tags"] == [{"key": "k2", "value": "v2"}]
@mock_ecs @mock_ecs
@ -87,25 +83,23 @@ def test_describe_capacity_provider__using_name():
) )
resp = client.describe_capacity_providers(capacityProviders=["my_provider"]) resp = client.describe_capacity_providers(capacityProviders=["my_provider"])
resp.should.have.key("capacityProviders").length_of(1) assert len(resp["capacityProviders"]) == 1
provider = resp["capacityProviders"][0] provider = resp["capacityProviders"][0]
provider.should.have.key("capacityProviderArn") assert "capacityProviderArn" in provider
provider.should.have.key("name").equals("my_provider") assert provider["name"] == "my_provider"
provider.should.have.key("status").equals("ACTIVE") assert provider["status"] == "ACTIVE"
provider.should.have.key("autoScalingGroupProvider").equals( assert provider["autoScalingGroupProvider"] == {
{ "autoScalingGroupArn": "asg:arn",
"autoScalingGroupArn": "asg:arn", "managedScaling": {
"managedScaling": { "instanceWarmupPeriod": 300,
"instanceWarmupPeriod": 300, "status": "ENABLED",
"status": "ENABLED", "targetCapacity": 5,
"targetCapacity": 5, "minimumScalingStepSize": 1,
"minimumScalingStepSize": 1, "maximumScalingStepSize": 2,
"maximumScalingStepSize": 2, },
}, "managedTerminationProtection": "DISABLED",
"managedTerminationProtection": "DISABLED", }
}
)
@mock_ecs @mock_ecs
@ -125,10 +119,10 @@ def test_describe_capacity_provider__using_arn():
)["capacityProvider"]["capacityProviderArn"] )["capacityProvider"]["capacityProviderArn"]
resp = client.describe_capacity_providers(capacityProviders=[provider_arn]) resp = client.describe_capacity_providers(capacityProviders=[provider_arn])
resp.should.have.key("capacityProviders").length_of(1) assert len(resp["capacityProviders"]) == 1
provider = resp["capacityProviders"][0] provider = resp["capacityProviders"][0]
provider.should.have.key("name").equals("my_provider") assert provider["name"] == "my_provider"
@mock_ecs @mock_ecs
@ -150,14 +144,13 @@ def test_describe_capacity_provider__missing():
resp = client.describe_capacity_providers( resp = client.describe_capacity_providers(
capacityProviders=["my_provider", "another_provider"] capacityProviders=["my_provider", "another_provider"]
) )
resp.should.have.key("capacityProviders").length_of(1) assert len(resp["capacityProviders"]) == 1
resp.should.have.key("failures").length_of(1) assert resp["failures"] == [
resp["failures"].should.contain(
{ {
"arn": f"arn:aws:ecs:us-west-1:{ACCOUNT_ID}:capacity_provider/another_provider", "arn": f"arn:aws:ecs:us-west-1:{ACCOUNT_ID}:capacity_provider/another_provider",
"reason": "MISSING", "reason": "MISSING",
} }
) ]
@mock_ecs @mock_ecs
@ -168,27 +161,22 @@ def test_delete_capacity_provider():
) )
resp = client.delete_capacity_provider(capacityProvider="my_provider") resp = client.delete_capacity_provider(capacityProvider="my_provider")
resp.should.have.key("capacityProvider") assert resp["capacityProvider"]["name"] == "my_provider"
resp["capacityProvider"].should.have.key("name").equals("my_provider")
# We can't find either provider # We can't find either provider
resp = client.describe_capacity_providers( resp = client.describe_capacity_providers(
capacityProviders=["my_provider", "another_provider"] capacityProviders=["my_provider", "another_provider"]
) )
resp.should.have.key("capacityProviders").length_of(0) assert resp["capacityProviders"] == []
resp.should.have.key("failures").length_of(2) assert len(resp["failures"]) == 2
resp["failures"].should.contain( assert {
{ "arn": f"arn:aws:ecs:us-west-1:{ACCOUNT_ID}:capacity_provider/another_provider",
"arn": f"arn:aws:ecs:us-west-1:{ACCOUNT_ID}:capacity_provider/another_provider", "reason": "MISSING",
"reason": "MISSING", } in resp["failures"]
} assert {
) "arn": f"arn:aws:ecs:us-west-1:{ACCOUNT_ID}:capacity_provider/my_provider",
resp["failures"].should.contain( "reason": "MISSING",
{ } in resp["failures"]
"arn": f"arn:aws:ecs:us-west-1:{ACCOUNT_ID}:capacity_provider/my_provider",
"reason": "MISSING",
}
)
@mock_ecs @mock_ecs
@ -202,23 +190,20 @@ def test_update_capacity_provider():
name="my_provider", name="my_provider",
autoScalingGroupProvider={"managedScaling": {"status": "ENABLED"}}, autoScalingGroupProvider={"managedScaling": {"status": "ENABLED"}},
) )
resp.should.have.key("capacityProvider") assert resp["capacityProvider"]["name"] == "my_provider"
resp["capacityProvider"].should.have.key("name").equals("my_provider")
# We can't find either provider # We can't find either provider
provider = client.describe_capacity_providers(capacityProviders=["my_provider"])[ provider = client.describe_capacity_providers(capacityProviders=["my_provider"])[
"capacityProviders" "capacityProviders"
][0] ][0]
provider["autoScalingGroupProvider"].should.equal( assert provider["autoScalingGroupProvider"] == {
{ "autoScalingGroupArn": "asg:arn",
"autoScalingGroupArn": "asg:arn", "managedScaling": {
"managedScaling": { "instanceWarmupPeriod": 300,
"instanceWarmupPeriod": 300, "maximumScalingStepSize": 10000,
"maximumScalingStepSize": 10000, "minimumScalingStepSize": 1,
"minimumScalingStepSize": 1, "status": "ENABLED",
"status": "ENABLED", "targetCapacity": 100,
"targetCapacity": 100, },
}, "managedTerminationProtection": "DISABLED",
"managedTerminationProtection": "DISABLED", }
}
)

View File

@ -3,7 +3,6 @@ import json
from copy import deepcopy from copy import deepcopy
from moto import mock_cloudformation, mock_ecs from moto import mock_cloudformation, mock_ecs
from moto.core.utils import pascal_to_camelcase, remap_nested_keys from moto.core.utils import pascal_to_camelcase, remap_nested_keys
import sure # noqa # pylint: disable=unused-import
@mock_ecs @mock_ecs
@ -44,8 +43,8 @@ def test_update_task_definition_family_through_cloudformation_should_trigger_a_r
ecs_conn = boto3.client("ecs", region_name="us-west-1") ecs_conn = boto3.client("ecs", region_name="us-west-1")
resp = ecs_conn.list_task_definitions(familyPrefix="testTaskDefinition2") resp = ecs_conn.list_task_definitions(familyPrefix="testTaskDefinition2")
len(resp["taskDefinitionArns"]).should.equal(1) assert len(resp["taskDefinitionArns"]) == 1
resp["taskDefinitionArns"][0].endswith("testTaskDefinition2:1").should.be.true assert resp["taskDefinitionArns"][0].endswith("testTaskDefinition2:1")
@mock_ecs @mock_ecs
@ -90,7 +89,7 @@ def test_create_service_through_cloudformation():
ecs_conn = boto3.client("ecs", region_name="us-west-1") ecs_conn = boto3.client("ecs", region_name="us-west-1")
resp = ecs_conn.list_services(cluster="testcluster") resp = ecs_conn.list_services(cluster="testcluster")
len(resp["serviceArns"]).should.equal(1) assert len(resp["serviceArns"]) == 1
@mock_ecs @mock_ecs
@ -135,7 +134,7 @@ def test_create_service_through_cloudformation_without_desiredcount():
ecs_conn = boto3.client("ecs", region_name="us-west-1") ecs_conn = boto3.client("ecs", region_name="us-west-1")
resp = ecs_conn.list_services(cluster="testcluster") resp = ecs_conn.list_services(cluster="testcluster")
len(resp["serviceArns"]).should.equal(1) assert len(resp["serviceArns"]) == 1
@mock_ecs @mock_ecs
@ -184,7 +183,7 @@ def test_update_service_through_cloudformation_should_trigger_replacement():
ecs_conn = boto3.client("ecs", region_name="us-west-1") ecs_conn = boto3.client("ecs", region_name="us-west-1")
resp = ecs_conn.list_services(cluster="testcluster") resp = ecs_conn.list_services(cluster="testcluster")
len(resp["serviceArns"]).should.equal(1) assert len(resp["serviceArns"]) == 1
@mock_ecs @mock_ecs
@ -234,7 +233,7 @@ def test_update_service_through_cloudformation_without_desiredcount():
ecs_conn = boto3.client("ecs", region_name="us-west-1") ecs_conn = boto3.client("ecs", region_name="us-west-1")
resp = ecs_conn.list_services(cluster="testcluster") resp = ecs_conn.list_services(cluster="testcluster")
len(resp["serviceArns"]).should.equal(1) assert len(resp["serviceArns"]) == 1
@mock_ecs @mock_ecs
@ -254,13 +253,13 @@ def test_create_cluster_through_cloudformation():
ecs_conn = boto3.client("ecs", region_name="us-west-1") ecs_conn = boto3.client("ecs", region_name="us-west-1")
resp = ecs_conn.list_clusters() resp = ecs_conn.list_clusters()
len(resp["clusterArns"]).should.equal(0) assert len(resp["clusterArns"]) == 0
cfn_conn = boto3.client("cloudformation", region_name="us-west-1") cfn_conn = boto3.client("cloudformation", region_name="us-west-1")
cfn_conn.create_stack(StackName="test_stack", TemplateBody=template_json) cfn_conn.create_stack(StackName="test_stack", TemplateBody=template_json)
resp = ecs_conn.list_clusters() resp = ecs_conn.list_clusters()
len(resp["clusterArns"]).should.equal(1) assert len(resp["clusterArns"]) == 1
@mock_ecs @mock_ecs
@ -279,7 +278,7 @@ def test_create_cluster_through_cloudformation_no_name():
ecs_conn = boto3.client("ecs", region_name="us-west-1") ecs_conn = boto3.client("ecs", region_name="us-west-1")
resp = ecs_conn.list_clusters() resp = ecs_conn.list_clusters()
len(resp["clusterArns"]).should.equal(1) assert len(resp["clusterArns"]) == 1
@mock_ecs @mock_ecs
@ -309,13 +308,13 @@ def test_update_cluster_name_through_cloudformation_should_trigger_a_replacement
ecs_conn = boto3.client("ecs", region_name="us-west-1") ecs_conn = boto3.client("ecs", region_name="us-west-1")
resp = ecs_conn.list_clusters() resp = ecs_conn.list_clusters()
len(resp["clusterArns"]).should.equal(2) assert len(resp["clusterArns"]) == 2
cluster1 = ecs_conn.describe_clusters(clusters=["testcluster1"])["clusters"][0] cluster1 = ecs_conn.describe_clusters(clusters=["testcluster1"])["clusters"][0]
cluster1["status"].should.equal("INACTIVE") assert cluster1["status"] == "INACTIVE"
cluster1 = ecs_conn.describe_clusters(clusters=["testcluster2"])["clusters"][0] cluster1 = ecs_conn.describe_clusters(clusters=["testcluster2"])["clusters"][0]
cluster1["status"].should.equal("ACTIVE") assert cluster1["status"] == "ACTIVE"
@mock_ecs @mock_ecs
@ -356,13 +355,13 @@ def test_create_task_definition_through_cloudformation():
ecs_conn = boto3.client("ecs", region_name="us-west-1") ecs_conn = boto3.client("ecs", region_name="us-west-1")
resp = ecs_conn.list_task_definitions() resp = ecs_conn.list_task_definitions()
len(resp["taskDefinitionArns"]).should.equal(1) assert len(resp["taskDefinitionArns"]) == 1
task_definition_arn = resp["taskDefinitionArns"][0] task_definition_arn = resp["taskDefinitionArns"][0]
task_definition_details = cfn_conn.describe_stack_resource( task_definition_details = cfn_conn.describe_stack_resource(
StackName=stack_name, LogicalResourceId="testTaskDefinition" StackName=stack_name, LogicalResourceId="testTaskDefinition"
)["StackResourceDetail"] )["StackResourceDetail"]
task_definition_details["PhysicalResourceId"].should.equal(task_definition_arn) assert task_definition_details["PhysicalResourceId"] == task_definition_arn
task_definition = ecs_conn.describe_task_definition( task_definition = ecs_conn.describe_task_definition(
taskDefinition=task_definition_arn taskDefinition=task_definition_arn
@ -370,6 +369,6 @@ def test_create_task_definition_through_cloudformation():
expected_properties = remap_nested_keys( expected_properties = remap_nested_keys(
template["Resources"]["testTaskDefinition"]["Properties"], pascal_to_camelcase template["Resources"]["testTaskDefinition"]["Properties"], pascal_to_camelcase
) )
task_definition["volumes"].should.equal(expected_properties["volumes"]) assert task_definition["volumes"] == expected_properties["volumes"]
for key, value in expected_properties["containerDefinitions"][0].items(): for key, value in expected_properties["containerDefinitions"][0].items():
task_definition["containerDefinitions"][0][key].should.equal(value) assert task_definition["containerDefinitions"][0][key] == value

View File

@ -29,15 +29,13 @@ def test_register_task_definition__use_efs_root():
family = task_definition["taskDefinition"]["family"] family = task_definition["taskDefinition"]["family"]
task = client.describe_task_definition(taskDefinition=family)["taskDefinition"] task = client.describe_task_definition(taskDefinition=family)["taskDefinition"]
task["volumes"].should.equal( assert task["volumes"] == [
[ {
{ "name": "vol1",
"name": "vol1", "efsVolumeConfiguration": {
"efsVolumeConfiguration": { "fileSystemId": "sth",
"fileSystemId": "sth", "rootDirectory": "/",
"rootDirectory": "/", "transitEncryption": "ENABLED",
"transitEncryption": "ENABLED", },
}, }
} ]
]
)

View File

@ -1,5 +1,4 @@
import boto3 import boto3
import sure # noqa # pylint: disable=unused-import
from moto import mock_ecs from moto import mock_ecs
@ -25,20 +24,18 @@ def test_describe_task_definition_with_tags():
response = client.describe_task_definition( response = client.describe_task_definition(
taskDefinition="test_ecs_task:1", include=["TAGS"] taskDefinition="test_ecs_task:1", include=["TAGS"]
) )
response["tags"].should.equal([{"key": "k1", "value": "v1"}]) assert response["tags"] == [{"key": "k1", "value": "v1"}]
client.tag_resource(resourceArn=task_def_arn, tags=[{"key": "k2", "value": "v2"}]) client.tag_resource(resourceArn=task_def_arn, tags=[{"key": "k2", "value": "v2"}])
response = client.describe_task_definition( response = client.describe_task_definition(
taskDefinition="test_ecs_task:1", include=["TAGS"] taskDefinition="test_ecs_task:1", include=["TAGS"]
) )
response["tags"].should.have.length_of(2) assert len(response["tags"]) == 2
response["tags"].should.contain({"key": "k1", "value": "v1"}) assert {"key": "k1", "value": "v1"} in response["tags"]
response["tags"].should.contain({"key": "k2", "value": "v2"}) assert {"key": "k2", "value": "v2"} in response["tags"]
client.untag_resource(resourceArn=task_def_arn, tagKeys=["k2"]) client.untag_resource(resourceArn=task_def_arn, tagKeys=["k2"])
resp = client.list_tags_for_resource(resourceArn=task_def_arn) resp = client.list_tags_for_resource(resourceArn=task_def_arn)
resp.should.have.key("tags") assert resp["tags"] == [{"key": "k1", "value": "v1"}]
resp["tags"].should.have.length_of(1)
resp["tags"].should.contain({"key": "k1", "value": "v1"})

View File

@ -1,6 +1,5 @@
from botocore.exceptions import ClientError from botocore.exceptions import ClientError
import boto3 import boto3
import sure # noqa # pylint: disable=unused-import
from moto import mock_ecs from moto import mock_ecs
import pytest import pytest
@ -44,18 +43,18 @@ def test_create_task_set():
service_arn = client.describe_services( service_arn = client.describe_services(
cluster=cluster_name, services=[service_name] cluster=cluster_name, services=[service_name]
)["services"][0]["serviceArn"] )["services"][0]["serviceArn"]
task_set["clusterArn"].should.equal(cluster_arn) assert task_set["clusterArn"] == cluster_arn
task_set["serviceArn"].should.equal(service_arn) assert task_set["serviceArn"] == service_arn
task_set["taskDefinition"].should.match(f"{task_def_name}:1$") assert task_set["taskDefinition"].endswith(f"{task_def_name}:1")
task_set["scale"].should.equal({"value": 100.0, "unit": "PERCENT"}) assert task_set["scale"] == {"value": 100.0, "unit": "PERCENT"}
task_set["loadBalancers"][0]["targetGroupArn"].should.equal( assert (
"arn:aws:elasticloadbalancing:us-east-1:01234567890:targetgroup/" task_set["loadBalancers"][0]["targetGroupArn"]
"c26b93c1bc35466ba792d5b08fe6a5bc/ec39113f8831453a" == "arn:aws:elasticloadbalancing:us-east-1:01234567890:targetgroup/c26b93c1bc35466ba792d5b08fe6a5bc/ec39113f8831453a"
) )
task_set["loadBalancers"][0]["containerPort"].should.equal(8080) assert task_set["loadBalancers"][0]["containerPort"] == 8080
task_set["loadBalancers"][0]["containerName"].should.equal("hello_world") assert task_set["loadBalancers"][0]["containerName"] == "hello_world"
task_set["launchType"].should.equal("EC2") assert task_set["launchType"] == "EC2"
task_set["platformVersion"].should.equal("LATEST") assert task_set["platformVersion"] == "LATEST"
@mock_ecs @mock_ecs
@ -84,11 +83,11 @@ def test_create_task_set_errors():
# then # then
ex = e.value ex = e.value
ex.operation_name.should.equal("CreateTaskSet") assert ex.operation_name == "CreateTaskSet"
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400) assert ex.response["ResponseMetadata"]["HTTPStatusCode"] == 400
ex.response["Error"]["Code"].should.contain("ClientException") assert ex.response["Error"]["Code"] == "ClientException"
ex.response["Error"]["Message"].should.equal( assert (
"launch type should be one of [EC2,FARGATE]" ex.response["Error"]["Message"] == "launch type should be one of [EC2,FARGATE]"
) )
@ -136,21 +135,21 @@ def test_describe_task_sets():
cluster=cluster_name, services=[service_name] cluster=cluster_name, services=[service_name]
)["services"][0]["serviceArn"] )["services"][0]["serviceArn"]
task_sets.should.have.length_of(1) assert len(task_sets) == 1
task_sets[0].should.have.key("tags") assert "tags" in task_sets[0]
task_sets[0]["taskDefinition"].should.match(f"{task_def_name}:1$") assert task_sets[0]["taskDefinition"].endswith(f"{task_def_name}:1")
task_sets[0]["clusterArn"].should.equal(cluster_arn) assert task_sets[0]["clusterArn"] == cluster_arn
task_sets[0]["serviceArn"].should.equal(service_arn) assert task_sets[0]["serviceArn"] == service_arn
task_sets[0]["serviceArn"].should.match(f"{service_name}$") assert task_sets[0]["serviceArn"].endswith(f"{service_name}")
task_sets[0]["scale"].should.equal({"value": 100.0, "unit": "PERCENT"}) assert task_sets[0]["scale"] == {"value": 100.0, "unit": "PERCENT"}
task_sets[0]["taskSetArn"].should.match(f"{task_sets[0]['id']}$") assert task_sets[0]["taskSetArn"].endswith(f"{task_sets[0]['id']}")
task_sets[0]["loadBalancers"][0]["targetGroupArn"].should.equal( assert (
"arn:aws:elasticloadbalancing:us-east-1:01234567890:targetgroup/" task_sets[0]["loadBalancers"][0]["targetGroupArn"]
"c26b93c1bc35466ba792d5b08fe6a5bc/ec39113f8831453a" == "arn:aws:elasticloadbalancing:us-east-1:01234567890:targetgroup/c26b93c1bc35466ba792d5b08fe6a5bc/ec39113f8831453a"
) )
task_sets[0]["loadBalancers"][0]["containerPort"].should.equal(8080) assert task_sets[0]["loadBalancers"][0]["containerPort"] == 8080
task_sets[0]["loadBalancers"][0]["containerName"].should.equal("hello_world") assert task_sets[0]["loadBalancers"][0]["containerName"] == "hello_world"
task_sets[0]["launchType"].should.equal("EC2") assert task_sets[0]["launchType"] == "EC2"
@mock_ecs @mock_ecs
@ -194,7 +193,7 @@ def test_delete_task_set():
assert len(task_sets) == 0 assert len(task_sets) == 0
with pytest.raises(ClientError): with pytest.raises(ClientError):
_ = client.delete_task_set( client.delete_task_set(
cluster=cluster_name, service=service_name, taskSet=task_set["taskSetArn"] cluster=cluster_name, service=service_name, taskSet=task_set["taskSetArn"]
) )
@ -248,7 +247,7 @@ def test_update_service_primary_task_set():
cluster=cluster_name, service=service_name, taskDefinition=task_def_name cluster=cluster_name, service=service_name, taskDefinition=task_def_name
)["taskSet"] )["taskSet"]
service = client.describe_services(cluster=cluster_name, services=[service_name],)[ service = client.describe_services(cluster=cluster_name, services=[service_name])[
"services" "services"
][0] ][0]
@ -258,7 +257,7 @@ def test_update_service_primary_task_set():
primaryTaskSet=task_set["taskSetArn"], primaryTaskSet=task_set["taskSetArn"],
) )
service = client.describe_services(cluster=cluster_name, services=[service_name],)[ service = client.describe_services(cluster=cluster_name, services=[service_name])[
"services" "services"
][0] ][0]
assert service["taskSets"][0]["status"] == "PRIMARY" assert service["taskSets"][0]["status"] == "PRIMARY"
@ -267,7 +266,7 @@ def test_update_service_primary_task_set():
another_task_set = client.create_task_set( another_task_set = client.create_task_set(
cluster=cluster_name, service=service_name, taskDefinition=task_def_name cluster=cluster_name, service=service_name, taskDefinition=task_def_name
)["taskSet"] )["taskSet"]
service = client.describe_services(cluster=cluster_name, services=[service_name],)[ service = client.describe_services(cluster=cluster_name, services=[service_name])[
"services" "services"
][0] ][0]
assert service["taskSets"][1]["status"] == "ACTIVE" assert service["taskSets"][1]["status"] == "ACTIVE"
@ -277,7 +276,7 @@ def test_update_service_primary_task_set():
service=service_name, service=service_name,
primaryTaskSet=another_task_set["taskSetArn"], primaryTaskSet=another_task_set["taskSetArn"],
) )
service = client.describe_services(cluster=cluster_name, services=[service_name],)[ service = client.describe_services(cluster=cluster_name, services=[service_name])[
"services" "services"
][0] ][0]
assert service["taskSets"][0]["status"] == "ACTIVE" assert service["taskSets"][0]["status"] == "ACTIVE"
@ -353,9 +352,10 @@ def test_create_task_sets_with_tags():
task_set = client.describe_task_sets( task_set = client.describe_task_sets(
cluster=cluster_name, service=service_name, include=["TAGS"] cluster=cluster_name, service=service_name, include=["TAGS"]
)["taskSets"][0] )["taskSets"][0]
task_set.should.have.key("tags").equals( assert task_set["tags"] == [
[{"key": "k1", "value": "v1"}, {"key": "k2", "value": "v2"}] {"key": "k1", "value": "v1"},
) {"key": "k2", "value": "v2"},
]
client.tag_resource( client.tag_resource(
resourceArn=task_set["taskSetArn"], tags=[{"key": "k3", "value": "v3"}] resourceArn=task_set["taskSetArn"], tags=[{"key": "k3", "value": "v3"}]
@ -364,19 +364,17 @@ def test_create_task_sets_with_tags():
task_set = client.describe_task_sets( task_set = client.describe_task_sets(
cluster=cluster_name, service=service_name, include=["TAGS"] cluster=cluster_name, service=service_name, include=["TAGS"]
)["taskSets"][0] )["taskSets"][0]
task_set.should.have.key("tags") assert len(task_set["tags"]) == 3
task_set["tags"].should.have.length_of(3) assert {"key": "k1", "value": "v1"} in task_set["tags"]
task_set["tags"].should.contain({"key": "k1", "value": "v1"}) assert {"key": "k2", "value": "v2"} in task_set["tags"]
task_set["tags"].should.contain({"key": "k2", "value": "v2"}) assert {"key": "k3", "value": "v3"} in task_set["tags"]
task_set["tags"].should.contain({"key": "k3", "value": "v3"})
client.untag_resource(resourceArn=task_set["taskSetArn"], tagKeys=["k2"]) client.untag_resource(resourceArn=task_set["taskSetArn"], tagKeys=["k2"])
resp = client.list_tags_for_resource(resourceArn=task_set["taskSetArn"]) resp = client.list_tags_for_resource(resourceArn=task_set["taskSetArn"])
resp.should.have.key("tags") assert len(resp["tags"]) == 2
resp["tags"].should.have.length_of(2) assert {"key": "k1", "value": "v1"} in resp["tags"]
resp["tags"].should.contain({"key": "k1", "value": "v1"}) assert {"key": "k3", "value": "v3"} in resp["tags"]
resp["tags"].should.contain({"key": "k3", "value": "v3"})
def create_task_def(client): def create_task_def(client):