From ca83236da6cded88511ba3ddd8fd149b784d3090 Mon Sep 17 00:00:00 2001 From: kbalk <7536198+kbalk@users.noreply.github.com> Date: Sat, 19 Aug 2023 13:58:06 -0400 Subject: [PATCH] Techdebt: Replace sure with regular assertions in RedShift (#6690) --- moto/apigateway/models.py | 17 +- moto/athena/models.py | 9 +- moto/rdsdata/models.py | 6 +- moto/redshift/models.py | 61 +- requirements-tests.txt | 1 - tests/test_core/test_importorder.py | 11 +- tests/test_redshift/test_redshift.py | 857 +++++++++--------- .../test_redshift_cloudformation.py | 24 +- tests/test_redshift/test_server.py | 160 ++-- 9 files changed, 591 insertions(+), 555 deletions(-) diff --git a/moto/apigateway/models.py b/moto/apigateway/models.py index a8db6431e..7e90765a2 100644 --- a/moto/apigateway/models.py +++ b/moto/apigateway/models.py @@ -1,14 +1,15 @@ -import string -import re -import responses -import requests -import time -from datetime import datetime from collections import defaultdict -from openapi_spec_validator import validate_spec +from datetime import datetime +import re +import string +import time from typing import Any, Dict, List, Optional, Tuple, Union from urllib.parse import urlparse +from openapi_spec_validator import validate_spec +import requests +import responses + try: from openapi_spec_validator.validation.exceptions import OpenAPIValidationError except ImportError: @@ -1511,7 +1512,7 @@ class APIGatewayBackend(BaseBackend): integrationHttpMethod="GET" ) deploy_url = f"https://{api_id}.execute-api.us-east-1.amazonaws.com/dev" - requests.get(deploy_url).content.should.equal(b"a fake response") + assert requests.get(deploy_url).content == b"a fake response" Limitations: - Integrations of type HTTP are supported diff --git a/moto/athena/models.py b/moto/athena/models.py index 09247a034..4764116fd 100644 --- a/moto/athena/models.py +++ b/moto/athena/models.py @@ -1,12 +1,13 @@ -import time from datetime import datetime +import time +from typing import Any, Dict, List, Optional + from moto.core import BaseBackend, BackendDict, BaseModel from moto.moto_api._internal import mock_random from moto.utilities.paginator import paginate -from typing import Any, Dict, List, Optional -class TaggableResourceMixin(object): +class TaggableResourceMixin: # This mixing was copied from Redshift when initially implementing # Athena. TBD if it's worth the overhead. @@ -263,7 +264,7 @@ class AthenaBackend(BaseBackend): "http://motoapi.amazonaws.com:5000/moto-api/static/athena/query-results", json=expected_results, ) - resp.status_code.should.equal(201) + assert resp.status_code == 201 client = boto3.client("athena", region_name="us-east-1") details = client.get_query_execution(QueryExecutionId="any_id")["QueryExecution"] diff --git a/moto/rdsdata/models.py b/moto/rdsdata/models.py index bb85a6052..45964f8a2 100644 --- a/moto/rdsdata/models.py +++ b/moto/rdsdata/models.py @@ -1,7 +1,7 @@ -from moto.core import BaseBackend, BackendDict - from typing import Any, Dict, List, Optional, Tuple +from moto.core import BaseBackend, BackendDict + class QueryResults: def __init__( @@ -66,7 +66,7 @@ class RDSDataServiceBackend(BaseBackend): "http://motoapi.amazonaws.com:5000/moto-api/static/rds-data/statement-results", json=expected_results, ) - resp.status_code.should.equal(201) + assert resp.status_code == 201 rdsdata = boto3.client("rds-data", region_name="us-east-1") resp = rdsdata.execute_statement(resourceArn="not applicable", secretArn="not applicable", sql="SELECT some FROM thing") diff --git a/moto/redshift/models.py b/moto/redshift/models.py index 9e6d1fe70..ff31fc59a 100644 --- a/moto/redshift/models.py +++ b/moto/redshift/models.py @@ -1,8 +1,10 @@ +from collections import OrderedDict import copy import datetime - -from collections import OrderedDict from typing import Any, Dict, Iterable, List, Optional + +from dateutil.tz import tzutc + from moto.core import BaseBackend, BackendDict, BaseModel, CloudFormationModel from moto.core.utils import iso_8601_datetime_with_milliseconds from moto.ec2 import ec2_backends @@ -46,7 +48,10 @@ class TaggableResourceMixin: @property def arn(self) -> str: - return f"arn:aws:redshift:{self.region}:{self.account_id}:{self.resource_type}:{self.resource_id}" + return ( + f"arn:aws:redshift:{self.region}:{self.account_id}" + f":{self.resource_type}:{self.resource_id}" + ) def create_tags(self, tags: List[Dict[str, str]]) -> List[Dict[str, str]]: new_keys = [tag_set["Key"] for tag_set in tags] @@ -95,7 +100,7 @@ class Cluster(TaggableResourceMixin, CloudFormationModel): self.redshift_backend = redshift_backend self.cluster_identifier = cluster_identifier self.create_time = iso_8601_datetime_with_milliseconds( - datetime.datetime.utcnow() + datetime.datetime.now(tzutc()) ) self.status = "available" self.node_type = node_type @@ -220,7 +225,7 @@ class Cluster(TaggableResourceMixin, CloudFormationModel): if attribute_name == "Endpoint.Address": return self.endpoint - elif attribute_name == "Endpoint.Port": + if attribute_name == "Endpoint.Port": return self.port raise UnformattedGetAttTemplateException() @@ -528,7 +533,7 @@ class Snapshot(TaggableResourceMixin, BaseModel): self.snapshot_type = snapshot_type self.status = "available" self.create_time = iso_8601_datetime_with_milliseconds( - datetime.datetime.utcnow() + datetime.datetime.now(tzutc()) ) self.iam_roles_arn = iam_roles_arn or [] @@ -622,8 +627,7 @@ class RedshiftBackend(BaseBackend): } cluster.cluster_snapshot_copy_status = status return cluster - else: - raise SnapshotCopyAlreadyEnabledFaultError(cluster_identifier) + raise SnapshotCopyAlreadyEnabledFaultError(cluster_identifier) def disable_snapshot_copy(self, **kwargs: Any) -> Cluster: cluster_identifier = kwargs["cluster_identifier"] @@ -631,8 +635,7 @@ class RedshiftBackend(BaseBackend): if cluster.cluster_snapshot_copy_status is not None: cluster.cluster_snapshot_copy_status = None return cluster - else: - raise SnapshotCopyAlreadyDisabledFaultError(cluster_identifier) + raise SnapshotCopyAlreadyDisabledFaultError(cluster_identifier) def modify_snapshot_copy_retention_period( self, cluster_identifier: str, retention_period: str @@ -650,7 +653,10 @@ class RedshiftBackend(BaseBackend): raise ClusterAlreadyExistsFaultError() cluster = Cluster(self, **cluster_kwargs) self.clusters[cluster_identifier] = cluster - snapshot_id = f"rs:{cluster_identifier}-{datetime.datetime.utcnow().strftime('%Y-%m-%d-%H-%M')}" + snapshot_id = ( + f"rs:{cluster_identifier}-" + f"{datetime.datetime.now(tzutc()).strftime('%Y-%m-%d-%H-%M')}" + ) # Automated snapshots don't copy over the tags self.create_cluster_snapshot( cluster_identifier, @@ -679,8 +685,7 @@ class RedshiftBackend(BaseBackend): if cluster_identifier: if cluster_identifier in self.clusters: return [self.clusters[cluster_identifier]] - else: - raise ClusterNotFoundError(cluster_identifier) + raise ClusterNotFoundError(cluster_identifier) return list(self.clusters.values()) def modify_cluster(self, **cluster_kwargs: Any) -> Cluster: @@ -739,9 +744,10 @@ class RedshiftBackend(BaseBackend): and cluster_snapshot_identifer is None ): raise InvalidParameterCombinationError( - "FinalClusterSnapshotIdentifier is required unless SkipFinalClusterSnapshot is specified." + "FinalClusterSnapshotIdentifier is required unless " + "SkipFinalClusterSnapshot is specified." ) - elif ( + if ( cluster_skip_final_snapshot is False and cluster_snapshot_identifer is not None ): # create snapshot @@ -781,8 +787,7 @@ class RedshiftBackend(BaseBackend): if subnet_identifier: if subnet_identifier in self.subnet_groups: return [self.subnet_groups[subnet_identifier]] - else: - raise ClusterSubnetGroupNotFoundError(subnet_identifier) + raise ClusterSubnetGroupNotFoundError(subnet_identifier) return list(self.subnet_groups.values()) def delete_cluster_subnet_group(self, subnet_identifier: str) -> SubnetGroup: @@ -812,8 +817,7 @@ class RedshiftBackend(BaseBackend): if security_group_name: if security_group_name in self.security_groups: return [self.security_groups[security_group_name]] - else: - raise ClusterSecurityGroupNotFoundError(security_group_name) + raise ClusterSecurityGroupNotFoundError(security_group_name) return list(self.security_groups.values()) def delete_cluster_security_group( @@ -861,8 +865,7 @@ class RedshiftBackend(BaseBackend): if parameter_group_name: if parameter_group_name in self.parameter_groups: return [self.parameter_groups[parameter_group_name]] - else: - raise ClusterParameterGroupNotFoundError(parameter_group_name) + raise ClusterParameterGroupNotFoundError(parameter_group_name) return list(self.parameter_groups.values()) def delete_cluster_parameter_group( @@ -983,8 +986,7 @@ class RedshiftBackend(BaseBackend): if snapshot_copy_grant_name: if snapshot_copy_grant_name in self.snapshot_copy_grants: return [self.snapshot_copy_grants[snapshot_copy_grant_name]] - else: - raise SnapshotCopyGrantNotFoundFaultError(snapshot_copy_grant_name) + raise SnapshotCopyGrantNotFoundFaultError(snapshot_copy_grant_name) return copy_grants def _get_resource_from_arn(self, arn: str) -> TaggableResourceMixin: @@ -1000,16 +1002,16 @@ class RedshiftBackend(BaseBackend): resources = self.RESOURCE_TYPE_MAP.get(resource_type) if resources is None: message = ( - f"Tagging is not supported for this type of resource: '{resource_type}' " - "(the ARN is potentially malformed, please check the ARN documentation for more information)" + "Tagging is not supported for this type of resource: " + f"'{resource_type}' (the ARN is potentially malformed, " + "please check the ARN documentation for more information)" ) raise ResourceNotFoundFaultError(message=message) try: resource = resources[resource_id] except KeyError: raise ResourceNotFoundFaultError(resource_type, resource_id) - else: - return resource + return resource @staticmethod def _describe_tags_for_resources(resources: Iterable[Any]) -> List[Dict[str, Any]]: # type: ignore[misc] @@ -1081,7 +1083,7 @@ class RedshiftBackend(BaseBackend): raise InvalidParameterValueError( "Token duration must be between 900 and 3600 seconds" ) - expiration = datetime.datetime.utcnow() + datetime.timedelta( + expiration = datetime.datetime.now(tzutc()) + datetime.timedelta( 0, duration_seconds ) if cluster_identifier in self.clusters: @@ -1092,8 +1094,7 @@ class RedshiftBackend(BaseBackend): "DbPassword": mock_random.get_random_string(32), "Expiration": expiration, } - else: - raise ClusterNotFoundError(cluster_identifier) + raise ClusterNotFoundError(cluster_identifier) redshift_backends = BackendDict(RedshiftBackend, "redshift") diff --git a/requirements-tests.txt b/requirements-tests.txt index f01abcfeb..5b1593c16 100644 --- a/requirements-tests.txt +++ b/requirements-tests.txt @@ -3,6 +3,5 @@ pytest pytest-cov pytest-ordering pytest-xdist -surer freezegun pylint diff --git a/tests/test_core/test_importorder.py b/tests/test_core/test_importorder.py index 526dd90ac..af704aa2c 100644 --- a/tests/test_core/test_importorder.py +++ b/tests/test_core/test_importorder.py @@ -1,15 +1,17 @@ +from unittest import SkipTest + import boto3 import pytest + from moto import mock_s3 from moto import settings -from unittest import SkipTest @pytest.fixture(scope="function", name="aws_credentials") def fixture_aws_credentials(monkeypatch): + """Mocked AWS Credentials for moto.""" if settings.TEST_SERVER_MODE: raise SkipTest("No point in testing this in ServerMode.") - """Mocked AWS Credentials for moto.""" monkeypatch.setenv("AWS_ACCESS_KEY_ID", "testing") monkeypatch.setenv("AWS_SECRET_ACCESS_KEY", "testing") monkeypatch.setenv("AWS_SECURITY_TOKEN", "testing") @@ -63,8 +65,7 @@ def test_mock_works_with_resource_created_outside( patch_resource(outside_resource) - b = list(outside_resource.buckets.all()) - assert b == [] + assert not list(outside_resource.buckets.all()) m.stop() @@ -126,7 +127,7 @@ def test_mock_works_when_replacing_client( try: logic.do_important_things() except Exception as e: - str(e).should.contain("InvalidAccessKeyId") + assert str(e) == "InvalidAccessKeyId" client_initialized_after_mock = boto3.client("s3", region_name="us-east-1") logic._s3 = client_initialized_after_mock diff --git a/tests/test_redshift/test_redshift.py b/tests/test_redshift/test_redshift.py index 9ca7eacc8..2d6baa7a2 100644 --- a/tests/test_redshift/test_redshift.py +++ b/tests/test_redshift/test_redshift.py @@ -1,10 +1,11 @@ -import time import datetime +import re +import time import boto3 from botocore.exceptions import ClientError +from dateutil.tz import tzutc import pytest -import sure # noqa # pylint: disable=unused-import from moto import mock_ec2 from moto import mock_redshift @@ -23,38 +24,36 @@ def test_create_cluster_boto3(): MasterUserPassword="password", ) cluster = response["Cluster"] - cluster["ClusterIdentifier"].should.equal("test") - cluster["NodeType"].should.equal("ds2.xlarge") - cluster["ClusterStatus"].should.equal("creating") + assert cluster["ClusterIdentifier"] == "test" + assert cluster["NodeType"] == "ds2.xlarge" + assert cluster["ClusterStatus"] == "creating" create_time = cluster["ClusterCreateTime"] - create_time.should.be.lower_than(datetime.datetime.now(create_time.tzinfo)) - create_time.should.be.greater_than( + assert create_time < datetime.datetime.now(create_time.tzinfo) + assert create_time > ( datetime.datetime.now(create_time.tzinfo) - datetime.timedelta(minutes=1) ) - cluster["MasterUsername"].should.equal("user") - cluster["DBName"].should.equal("test") - cluster["AutomatedSnapshotRetentionPeriod"].should.equal(1) - cluster["ClusterSecurityGroups"].should.equal( - [{"ClusterSecurityGroupName": "Default", "Status": "active"}] - ) - cluster["VpcSecurityGroups"].should.equal([]) - cluster["ClusterParameterGroups"].should.equal( - [ - { - "ParameterGroupName": "default.redshift-1.0", - "ParameterApplyStatus": "in-sync", - } - ] - ) - cluster["ClusterSubnetGroupName"].should.equal("") - cluster["AvailabilityZone"].should.equal("us-east-1a") - cluster["PreferredMaintenanceWindow"].should.equal("Mon:03:00-Mon:03:30") - cluster["ClusterVersion"].should.equal("1.0") - cluster["AllowVersionUpgrade"].should.equal(True) - cluster["NumberOfNodes"].should.equal(1) - cluster["EnhancedVpcRouting"].should.equal(False) - cluster["KmsKeyId"].should.equal("") - cluster["Endpoint"]["Port"].should.equal(5439) + assert cluster["MasterUsername"] == "user" + assert cluster["DBName"] == "test" + assert cluster["AutomatedSnapshotRetentionPeriod"] == 1 + assert cluster["ClusterSecurityGroups"] == [ + {"ClusterSecurityGroupName": "Default", "Status": "active"} + ] + assert cluster["VpcSecurityGroups"] == [] + assert cluster["ClusterParameterGroups"] == [ + { + "ParameterGroupName": "default.redshift-1.0", + "ParameterApplyStatus": "in-sync", + } + ] + assert cluster["ClusterSubnetGroupName"] == "" + assert cluster["AvailabilityZone"] == "us-east-1a" + assert cluster["PreferredMaintenanceWindow"] == "Mon:03:00-Mon:03:30" + assert cluster["ClusterVersion"] == "1.0" + assert cluster["AllowVersionUpgrade"] is True + assert cluster["NumberOfNodes"] == 1 + assert cluster["EnhancedVpcRouting"] is False + assert cluster["KmsKeyId"] == "" + assert cluster["Endpoint"]["Port"] == 5439 @mock_redshift @@ -69,13 +68,13 @@ def test_create_cluster_with_enhanced_vpc_routing_enabled(): MasterUserPassword="password", EnhancedVpcRouting=True, ) - response["Cluster"]["NodeType"].should.equal("ds2.xlarge") + assert response["Cluster"]["NodeType"] == "ds2.xlarge" create_time = response["Cluster"]["ClusterCreateTime"] - create_time.should.be.lower_than(datetime.datetime.now(create_time.tzinfo)) - create_time.should.be.greater_than( + assert create_time < datetime.datetime.now(create_time.tzinfo) + assert create_time > ( datetime.datetime.now(create_time.tzinfo) - datetime.timedelta(minutes=1) ) - response["Cluster"]["EnhancedVpcRouting"].should.equal(True) + assert response["Cluster"]["EnhancedVpcRouting"] is True @mock_redshift @@ -93,14 +92,14 @@ def test_create_and_describe_cluster_with_kms_key_id(): MasterUserPassword="password", KmsKeyId=kms_key_id, ) - response["Cluster"]["KmsKeyId"].should.equal(kms_key_id) + assert response["Cluster"]["KmsKeyId"] == kms_key_id response = client.describe_clusters() clusters = response.get("Clusters", []) - len(clusters).should.equal(1) + assert len(clusters) == 1 cluster = clusters[0] - cluster["KmsKeyId"].should.equal(kms_key_id) + assert cluster["KmsKeyId"] == kms_key_id @mock_redshift @@ -109,14 +108,13 @@ def test_create_snapshot_copy_grant(): grants = client.create_snapshot_copy_grant( SnapshotCopyGrantName="test-us-east-1", KmsKeyId="fake" ) - grants["SnapshotCopyGrant"]["SnapshotCopyGrantName"].should.equal("test-us-east-1") - grants["SnapshotCopyGrant"]["KmsKeyId"].should.equal("fake") + assert grants["SnapshotCopyGrant"]["SnapshotCopyGrantName"] == "test-us-east-1" + assert grants["SnapshotCopyGrant"]["KmsKeyId"] == "fake" client.delete_snapshot_copy_grant(SnapshotCopyGrantName="test-us-east-1") - client.describe_snapshot_copy_grants.when.called_with( - SnapshotCopyGrantName="test-us-east-1" - ).should.throw(ClientError) + with pytest.raises(ClientError): + client.describe_snapshot_copy_grants(SnapshotCopyGrantName="test-us-east-1") @mock_redshift @@ -128,14 +126,14 @@ def test_create_many_snapshot_copy_grants(): SnapshotCopyGrantName=f"test-us-east-1-{i}", KmsKeyId="fake" ) response = client.describe_snapshot_copy_grants() - len(response["SnapshotCopyGrants"]).should.equal(10) + assert len(response["SnapshotCopyGrants"]) == 10 @mock_redshift def test_no_snapshot_copy_grants(): client = boto3.client("redshift", region_name="us-east-1") response = client.describe_snapshot_copy_grants() - len(response["SnapshotCopyGrants"]).should.equal(0) + assert len(response["SnapshotCopyGrants"]) == 0 @mock_redshift @@ -164,64 +162,63 @@ def test_create_cluster_all_attributes(): NumberOfNodes=3, ) cluster = cluster_response["Cluster"] - cluster["ClusterIdentifier"].should.equal(cluster_identifier) - cluster["NodeType"].should.equal("dc1.large") - cluster["ClusterStatus"].should.equal("creating") - # cluster["ClusterAvailabilityStatus"].should.equal("Modifying") - cluster["MasterUsername"].should.equal("username") - cluster["DBName"].should.equal("my_db") - cluster["AutomatedSnapshotRetentionPeriod"].should.equal(10) - # cluster["ManualSnapshotRetentionPeriod"].should.equal(-1) - # cluster["ClusterSecurityGroups"].should.equal([]) - cluster["ClusterParameterGroups"].should.have.length_of(1) + assert cluster["ClusterIdentifier"] == cluster_identifier + assert cluster["NodeType"] == "dc1.large" + assert cluster["ClusterStatus"] == "creating" + # assert cluster["ClusterAvailabilityStatus"] == "Modifying" + assert cluster["MasterUsername"] == "username" + assert cluster["DBName"] == "my_db" + assert cluster["AutomatedSnapshotRetentionPeriod"] == 10 + # assert cluster["ManualSnapshotRetentionPeriod"] == -1 + # assert cluster["ClusterSecurityGroups"] == [] + assert len(cluster["ClusterParameterGroups"]) == 1 param_group = cluster["ClusterParameterGroups"][0] - param_group.should.equal( - { - "ParameterGroupName": "default.redshift-1.0", - "ParameterApplyStatus": "in-sync", - } - ) - # cluster["ClusterSubnetGroupName"].should.equal("default") - cluster["AvailabilityZone"].should.equal("us-east-1d") - cluster["PreferredMaintenanceWindow"].should.equal("Mon:03:00-Mon:11:00") - cluster["ClusterVersion"].should.equal("1.0") - cluster["AllowVersionUpgrade"].should.equal(True) - cluster["NumberOfNodes"].should.equal(3) - # cluster["PubliclyAccessible"].should.equal(True) - cluster["Encrypted"].should.equal(False) - cluster["EnhancedVpcRouting"].should.equal(False) + assert param_group == { + "ParameterGroupName": "default.redshift-1.0", + "ParameterApplyStatus": "in-sync", + } + # assert cluster["ClusterSubnetGroupName"] == "default" + assert cluster["AvailabilityZone"] == "us-east-1d" + assert cluster["PreferredMaintenanceWindow"] == "Mon:03:00-Mon:11:00" + assert cluster["ClusterVersion"] == "1.0" + assert cluster["AllowVersionUpgrade"] is True + assert cluster["NumberOfNodes"] == 3 + # assert cluster["PubliclyAccessible"] is True + assert cluster["Encrypted"] is False + assert cluster["EnhancedVpcRouting"] is False cluster_response = client.describe_clusters(ClusterIdentifier=cluster_identifier) cluster = cluster_response["Clusters"][0] - cluster["ClusterIdentifier"].should.equal(cluster_identifier) + assert cluster["ClusterIdentifier"] == cluster_identifier # AWS returns 'Available' (upper cased) - cluster["ClusterStatus"].should.equal("available") - # cluster["ClusterAvailabilityStatus"].should.equal("Available") - cluster["NodeType"].should.equal("dc1.large") - cluster["MasterUsername"].should.equal("username") - cluster["DBName"].should.equal("my_db") + assert cluster["ClusterStatus"] == "available" + # assert cluster["ClusterAvailabilityStatus"] == "Available" + assert cluster["NodeType"] == "dc1.large" + assert cluster["MasterUsername"] == "username" + assert cluster["DBName"] == "my_db" # AWS returns: ClusterSecurityGroups=[] - # cluster["ClusterSecurityGroups"][0]["ClusterSecurityGroupName"].should.equal("Default") + # assert cluster["ClusterSecurityGroups"][0]["ClusterSecurityGroupName"] == "Default" # AWS returns default sg: [{'VpcSecurityGroupId': 'sg-...', 'Status': 'active'}], - # cluster["VpcSecurityGroups"].should.equal([]) - # cluster["ClusterSubnetGroupName"].should.equal("default") + # assert cluster["VpcSecurityGroups"] == [] + # assert cluster["ClusterSubnetGroupName"] == "default" # AWS returns default VPC ID - # cluster["VpcId"].should.equal("vpc-...") - cluster["AvailabilityZone"].should.equal("us-east-1d") - cluster["PreferredMaintenanceWindow"].should.equal("Mon:03:00-Mon:11:00") - cluster["ClusterParameterGroups"][0]["ParameterGroupName"].should.equal( + # assert cluster["VpcId"] == "vpc-..." + assert cluster["AvailabilityZone"] == "us-east-1d" + assert cluster["PreferredMaintenanceWindow"] == "Mon:03:00-Mon:11:00" + assert cluster["ClusterParameterGroups"][0]["ParameterGroupName"] == ( "default.redshift-1.0" ) - cluster["AutomatedSnapshotRetentionPeriod"].should.equal(10) + assert cluster["AutomatedSnapshotRetentionPeriod"] == 10 # Endpoint only returned when ClusterStatus=Available - cluster["Endpoint"]["Address"].should.match( - f"{cluster_identifier}.[a-z0-9]+.{region}.redshift.amazonaws.com" + assert re.match( + f"{cluster_identifier}.[a-z0-9]+.{region}.redshift.amazonaws.com", + cluster["Endpoint"]["Address"], ) - cluster["Endpoint"]["Port"].should.equal(1234) - cluster["ClusterVersion"].should.equal("1.0") - cluster["AllowVersionUpgrade"].should.equal(True) - cluster["NumberOfNodes"].should.equal(3) + assert cluster["Endpoint"]["Port"] == 1234 + assert cluster["ClusterVersion"] == "1.0" + assert cluster["AllowVersionUpgrade"] is True + assert cluster["NumberOfNodes"] == 3 @mock_redshift @@ -237,17 +234,17 @@ def test_create_single_node_cluster_boto3(): DBName="my_db", ClusterType="single-node", )["Cluster"] - cluster["ClusterIdentifier"].should.equal(cluster_identifier) - cluster["NumberOfNodes"].should.equal(1) + assert cluster["ClusterIdentifier"] == cluster_identifier + assert cluster["NumberOfNodes"] == 1 cluster_response = client.describe_clusters(ClusterIdentifier=cluster_identifier) cluster = cluster_response["Clusters"][0] - cluster["ClusterIdentifier"].should.equal(cluster_identifier) - cluster["NodeType"].should.equal("dw.hs1.xlarge") - cluster["MasterUsername"].should.equal("username") - cluster["DBName"].should.equal("my_db") - cluster["NumberOfNodes"].should.equal(1) + assert cluster["ClusterIdentifier"] == cluster_identifier + assert cluster["NodeType"] == "dw.hs1.xlarge" + assert cluster["MasterUsername"] == "username" + assert cluster["DBName"] == "my_db" + assert cluster["NumberOfNodes"] == 1 @mock_redshift @@ -273,7 +270,7 @@ def test_create_cluster_in_subnet_group(): cluster_response = client.describe_clusters(ClusterIdentifier="my_cluster") cluster = cluster_response["Clusters"][0] - cluster["ClusterSubnetGroupName"].should.equal("my_subnet_group") + assert cluster["ClusterSubnetGroupName"] == "my_subnet_group" @mock_redshift @@ -299,7 +296,7 @@ def test_create_cluster_in_subnet_group_boto3(): cluster_response = client.describe_clusters(ClusterIdentifier="my_cluster") cluster = cluster_response["Clusters"][0] - cluster["ClusterSubnetGroupName"].should.equal("my_subnet_group") + assert cluster["ClusterSubnetGroupName"] == "my_subnet_group" @mock_redshift @@ -327,7 +324,7 @@ def test_create_cluster_with_security_group_boto3(): group_names = [ group["ClusterSecurityGroupName"] for group in cluster["ClusterSecurityGroups"] ] - set(group_names).should.equal({"security_group1", "security_group2"}) + assert set(group_names) == {"security_group1", "security_group2"} @mock_redshift @@ -350,7 +347,7 @@ def test_create_cluster_with_vpc_security_groups_boto3(): response = client.describe_clusters(ClusterIdentifier=cluster_id) cluster = response["Clusters"][0] group_ids = [group["VpcSecurityGroupId"] for group in cluster["VpcSecurityGroups"]] - list(group_ids).should.equal([security_group.id]) + assert list(group_ids) == [security_group.id] @mock_redshift @@ -368,7 +365,7 @@ def test_create_cluster_with_iam_roles(): response = client.describe_clusters(ClusterIdentifier=cluster_id) cluster = response["Clusters"][0] iam_roles = [role["IamRoleArn"] for role in cluster["IamRoles"]] - iam_roles_arn.should.equal(iam_roles) + assert iam_roles_arn == iam_roles @mock_redshift @@ -380,10 +377,10 @@ def test_create_cluster_with_parameter_group_boto3(): ParameterGroupFamily="redshift-1.0", Description="This is my group", )["ClusterParameterGroup"] - group["ParameterGroupName"].should.equal("my-parameter-group") - group["ParameterGroupFamily"].should.equal("redshift-1.0") - group["Description"].should.equal("This is my group") - group["Tags"].should.equal([]) + assert group["ParameterGroupName"] == "my-parameter-group" + assert group["ParameterGroupFamily"] == "redshift-1.0" + assert group["Description"] == "This is my group" + assert group["Tags"] == [] cluster = client.create_cluster( ClusterIdentifier=cluster_id, @@ -393,19 +390,19 @@ def test_create_cluster_with_parameter_group_boto3(): ClusterType="single-node", ClusterParameterGroupName="my-parameter-group", )["Cluster"] - cluster["ClusterParameterGroups"].should.have.length_of(1) - cluster["ClusterParameterGroups"][0]["ParameterGroupName"].should.equal( + assert len(cluster["ClusterParameterGroups"]) == 1 + assert cluster["ClusterParameterGroups"][0]["ParameterGroupName"] == ( "my-parameter-group" ) - cluster["ClusterParameterGroups"][0]["ParameterApplyStatus"].should.equal("in-sync") + assert cluster["ClusterParameterGroups"][0]["ParameterApplyStatus"] == "in-sync" cluster_response = client.describe_clusters(ClusterIdentifier=cluster_id) cluster = cluster_response["Clusters"][0] - cluster["ClusterParameterGroups"].should.have.length_of(1) - cluster["ClusterParameterGroups"][0]["ParameterGroupName"].should.equal( + assert len(cluster["ClusterParameterGroups"]) == 1 + assert cluster["ClusterParameterGroups"][0]["ParameterGroupName"] == ( "my-parameter-group" ) - cluster["ClusterParameterGroups"][0]["ParameterApplyStatus"].should.equal("in-sync") + assert cluster["ClusterParameterGroups"][0]["ParameterApplyStatus"] == "in-sync" @mock_redshift @@ -414,8 +411,8 @@ def test_describe_non_existent_cluster_boto3(): with pytest.raises(ClientError) as ex: client.describe_clusters(ClusterIdentifier="not-a-cluster") err = ex.value.response["Error"] - err["Code"].should.equal("ClusterNotFound") - err["Message"].should.equal("Cluster not-a-cluster not found.") + assert err["Code"] == "ClusterNotFound" + assert err["Message"] == "Cluster not-a-cluster not found." @mock_redshift @@ -434,7 +431,7 @@ def test_modify_cluster_vpc_routing(): cluster_response = client.describe_clusters(ClusterIdentifier=cluster_identifier) cluster = cluster_response["Clusters"][0] - cluster["EnhancedVpcRouting"].should.equal(False) + assert cluster["EnhancedVpcRouting"] is False client.create_cluster_security_group( ClusterSecurityGroupName="security_group", Description="security_group" @@ -463,14 +460,14 @@ def test_modify_cluster_vpc_routing(): cluster_response = client.describe_clusters(ClusterIdentifier=cluster_identifier) cluster = cluster_response["Clusters"][0] - cluster["ClusterIdentifier"].should.equal(cluster_identifier) - cluster["NodeType"].should.equal("ds2.8xlarge") - cluster["PreferredMaintenanceWindow"].should.equal("Tue:03:00-Tue:11:00") - cluster["AutomatedSnapshotRetentionPeriod"].should.equal(7) - cluster["AllowVersionUpgrade"].should.equal(False) + assert cluster["ClusterIdentifier"] == cluster_identifier + assert cluster["NodeType"] == "ds2.8xlarge" + assert cluster["PreferredMaintenanceWindow"] == "Tue:03:00-Tue:11:00" + assert cluster["AutomatedSnapshotRetentionPeriod"] == 7 + assert cluster["AllowVersionUpgrade"] is False # This one should remain unmodified. - cluster["NumberOfNodes"].should.equal(3) - cluster["EnhancedVpcRouting"].should.equal(True) + assert cluster["NumberOfNodes"] == 3 + assert cluster["EnhancedVpcRouting"] is True @mock_redshift @@ -496,7 +493,7 @@ def test_modify_cluster_boto3(): cluster_response = client.describe_clusters(ClusterIdentifier=cluster_identifier) cluster = cluster_response["Clusters"][0] - cluster["EnhancedVpcRouting"].should.equal(False) + assert cluster["EnhancedVpcRouting"] is False client.modify_cluster( ClusterIdentifier=cluster_identifier, @@ -514,18 +511,18 @@ def test_modify_cluster_boto3(): cluster_response = client.describe_clusters(ClusterIdentifier=cluster_identifier) cluster = cluster_response["Clusters"][0] - cluster["ClusterIdentifier"].should.equal(cluster_identifier) - cluster["NodeType"].should.equal("dw.hs1.xlarge") - cluster["ClusterSecurityGroups"][0]["ClusterSecurityGroupName"].should.equal( + assert cluster["ClusterIdentifier"] == cluster_identifier + assert cluster["NodeType"] == "dw.hs1.xlarge" + assert cluster["ClusterSecurityGroups"][0]["ClusterSecurityGroupName"] == ( "security_group" ) - cluster["PreferredMaintenanceWindow"].should.equal("Tue:03:00-Tue:11:00") - cluster["ClusterParameterGroups"][0]["ParameterGroupName"].should.equal( + assert cluster["PreferredMaintenanceWindow"] == "Tue:03:00-Tue:11:00" + assert cluster["ClusterParameterGroups"][0]["ParameterGroupName"] == ( "my_parameter_group" ) - cluster["AutomatedSnapshotRetentionPeriod"].should.equal(7) - cluster["AllowVersionUpgrade"].should.equal(False) - cluster["NumberOfNodes"].should.equal(4) + assert cluster["AutomatedSnapshotRetentionPeriod"] == 7 + assert cluster["AllowVersionUpgrade"] is False + assert cluster["NumberOfNodes"] == 4 @mock_redshift @@ -548,10 +545,10 @@ def test_create_cluster_subnet_group(): ) my_subnet = subnets_response["ClusterSubnetGroups"][0] - my_subnet["ClusterSubnetGroupName"].should.equal("my_subnet_group") - my_subnet["Description"].should.equal("This is my subnet group") + assert my_subnet["ClusterSubnetGroupName"] == "my_subnet_group" + assert my_subnet["Description"] == "This is my subnet group" subnet_ids = [subnet["SubnetIdentifier"] for subnet in my_subnet["Subnets"]] - set(subnet_ids).should.equal(set([subnet1.id, subnet2.id])) + assert set(subnet_ids) == set([subnet1.id, subnet2.id]) @mock_redshift @@ -617,8 +614,8 @@ def test_create_invalid_cluster_subnet_group_boto3(): SubnetIds=["subnet-1234"], ) err = ex.value.response["Error"] - err["Code"].should.equal("InvalidSubnet") - err["Message"].should.match(r"Subnet \[[a-z0-9-']+\] not found.") + assert err["Code"] == "InvalidSubnet" + assert re.match(r"Subnet \[[a-z0-9-']+\] not found.", err["Message"]) @mock_redshift @@ -628,8 +625,8 @@ def test_describe_non_existent_subnet_group_boto3(): with pytest.raises(ClientError) as ex: client.describe_cluster_subnet_groups(ClusterSubnetGroupName="my_subnet") err = ex.value.response["Error"] - err["Code"].should.equal("ClusterSubnetGroupNotFound") - err["Message"].should.equal("Subnet group my_subnet not found.") + assert err["Code"] == "ClusterSubnetGroupNotFound" + assert err["Message"] == "Subnet group my_subnet not found." @mock_redshift @@ -648,18 +645,17 @@ def test_delete_cluster_subnet_group(): subnets_response = client.describe_cluster_subnet_groups() subnets = subnets_response["ClusterSubnetGroups"] - subnets.should.have.length_of(1) + assert len(subnets) == 1 client.delete_cluster_subnet_group(ClusterSubnetGroupName="my_subnet_group") subnets_response = client.describe_cluster_subnet_groups() subnets = subnets_response["ClusterSubnetGroups"] - subnets.should.have.length_of(0) + assert len(subnets) == 0 # Delete invalid id - client.delete_cluster_subnet_group.when.called_with( - ClusterSubnetGroupName="not-a-subnet-group" - ).should.throw(ClientError) + with pytest.raises(ClientError): + client.delete_cluster_subnet_group(ClusterSubnetGroupName="not-a-subnet-group") @mock_redshift @@ -670,22 +666,22 @@ def test_create_cluster_security_group_boto3(): Description="This is my security group", Tags=[{"Key": "tag_key", "Value": "tag_value"}], )["ClusterSecurityGroup"] - group["ClusterSecurityGroupName"].should.equal("my_security_group") - group["Description"].should.equal("This is my security group") - group["EC2SecurityGroups"].should.equal([]) - group["IPRanges"].should.equal([]) - group["Tags"].should.equal([{"Key": "tag_key", "Value": "tag_value"}]) + assert group["ClusterSecurityGroupName"] == "my_security_group" + assert group["Description"] == "This is my security group" + assert group["EC2SecurityGroups"] == [] + assert group["IPRanges"] == [] + assert group["Tags"] == [{"Key": "tag_key", "Value": "tag_value"}] groups_response = client.describe_cluster_security_groups( ClusterSecurityGroupName="my_security_group" ) my_group = groups_response["ClusterSecurityGroups"][0] - my_group["ClusterSecurityGroupName"].should.equal("my_security_group") - my_group["Description"].should.equal("This is my security group") - my_group["EC2SecurityGroups"].should.equal([]) - my_group["IPRanges"].should.equal([]) - my_group["Tags"].should.equal([{"Key": "tag_key", "Value": "tag_value"}]) + assert my_group["ClusterSecurityGroupName"] == "my_security_group" + assert my_group["Description"] == "This is my security group" + assert my_group["EC2SecurityGroups"] == [] + assert my_group["IPRanges"] == [] + assert my_group["Tags"] == [{"Key": "tag_key", "Value": "tag_value"}] @mock_redshift @@ -695,8 +691,8 @@ def test_describe_non_existent_security_group_boto3(): with pytest.raises(ClientError) as ex: client.describe_cluster_security_groups(ClusterSecurityGroupName="non-existent") err = ex.value.response["Error"] - err["Code"].should.equal("ClusterSecurityGroupNotFound") - err["Message"].should.equal("Security group non-existent not found.") + assert err["Code"] == "ClusterSecurityGroupNotFound" + assert err["Message"] == "Security group non-existent not found." @mock_redshift @@ -708,12 +704,12 @@ def test_delete_cluster_security_group_boto3(): ) groups = client.describe_cluster_security_groups()["ClusterSecurityGroups"] - groups.should.have.length_of(2) # The default group already exists + assert len(groups) == 2 # The default group already exists client.delete_cluster_security_group(ClusterSecurityGroupName="my_security_group") groups = client.describe_cluster_security_groups()["ClusterSecurityGroups"] - groups.should.have.length_of(1) + assert len(groups) == 1 # Delete invalid id with pytest.raises(ClientError) as ex: @@ -721,8 +717,8 @@ def test_delete_cluster_security_group_boto3(): ClusterSecurityGroupName="not-a-security-group" ) err = ex.value.response["Error"] - err["Code"].should.equal("ClusterSecurityGroupNotFound") - err["Message"].should.equal("Security group not-a-security-group not found.") + assert err["Code"] == "ClusterSecurityGroupNotFound" + assert err["Message"] == "Security group not-a-security-group not found." @mock_redshift @@ -733,19 +729,19 @@ def test_create_cluster_parameter_group_boto3(): ParameterGroupFamily="redshift-1.0", Description="This is my group", )["ClusterParameterGroup"] - group["ParameterGroupName"].should.equal("my-parameter-group") - group["ParameterGroupFamily"].should.equal("redshift-1.0") - group["Description"].should.equal("This is my group") - group["Tags"].should.equal([]) + assert group["ParameterGroupName"] == "my-parameter-group" + assert group["ParameterGroupFamily"] == "redshift-1.0" + assert group["Description"] == "This is my group" + assert group["Tags"] == [] groups_response = client.describe_cluster_parameter_groups( ParameterGroupName="my-parameter-group" ) my_group = groups_response["ParameterGroups"][0] - my_group["ParameterGroupName"].should.equal("my-parameter-group") - my_group["ParameterGroupFamily"].should.equal("redshift-1.0") - my_group["Description"].should.equal("This is my group") + assert my_group["ParameterGroupName"] == "my-parameter-group" + assert my_group["ParameterGroupFamily"] == "redshift-1.0" + assert my_group["Description"] == "This is my group" @mock_redshift @@ -756,8 +752,8 @@ def test_describe_non_existent_parameter_group_boto3(): ParameterGroupName="not-a-parameter-group" ) err = ex.value.response["Error"] - err["Code"].should.equal("ClusterParameterGroupNotFound") - err["Message"].should.equal("Parameter group not-a-parameter-group not found.") + assert err["Code"] == "ClusterParameterGroupNotFound" + assert err["Message"] == "Parameter group not-a-parameter-group not found." @mock_redshift @@ -768,34 +764,34 @@ def test_delete_parameter_group_boto3(): ParameterGroupFamily="redshift-1.0", Description="This is my group", ) - client.describe_cluster_parameter_groups()["ParameterGroups"].should.have.length_of( - 2 - ) + assert len(client.describe_cluster_parameter_groups()["ParameterGroups"]) == 2 x = client.delete_cluster_parameter_group(ParameterGroupName="my-parameter-group") del x["ResponseMetadata"] - x.should.equal({}) + assert x == {} with pytest.raises(ClientError) as ex: client.delete_cluster_parameter_group(ParameterGroupName="my-parameter-group") err = ex.value.response["Error"] - err["Code"].should.equal("ClusterParameterGroupNotFound") + assert err["Code"] == "ClusterParameterGroupNotFound" # BUG: This is what AWS returns - # err["Message"].should.equal("ParameterGroup not found: my-parameter-group") - err["Message"].should.equal("Parameter group my-parameter-group not found.") + # assert err["Message"] == "ParameterGroup not found: my-parameter-group" + assert err["Message"] == "Parameter group my-parameter-group not found." - client.describe_cluster_parameter_groups()["ParameterGroups"].should.have.length_of( - 1 - ) + assert len(client.describe_cluster_parameter_groups()["ParameterGroups"]) == 1 @mock_redshift def test_create_cluster_snapshot_of_non_existent_cluster(): client = boto3.client("redshift", region_name="us-east-1") cluster_identifier = "non-existent-cluster-id" - client.create_cluster_snapshot.when.called_with( - SnapshotIdentifier="snapshot-id", ClusterIdentifier=cluster_identifier - ).should.throw(ClientError, f"Cluster {cluster_identifier} not found.") + with pytest.raises(ClientError) as client_err: + client.create_cluster_snapshot( + SnapshotIdentifier="snapshot-id", ClusterIdentifier=cluster_identifier + ) + assert client_err.value.response["Error"]["Message"] == ( + f"Cluster {cluster_identifier} not found." + ) @mock_redshift @@ -814,15 +810,15 @@ def test_automated_snapshot_on_cluster_creation(): Tags=[{"Key": "tag_key", "Value": "tag_value"}], ) - cluster_response["Cluster"]["Tags"].should.equal( - [{"Key": "tag_key", "Value": "tag_value"}] - ) + assert cluster_response["Cluster"]["Tags"] == [ + {"Key": "tag_key", "Value": "tag_value"} + ] resp_auto_snap = client.describe_cluster_snapshots( ClusterIdentifier=cluster_identifier ) - resp_auto_snap["Snapshots"][0]["SnapshotType"].should.equal("automated") + assert resp_auto_snap["Snapshots"][0]["SnapshotType"] == "automated" # Tags from cluster are not copied over to automated snapshot - resp_auto_snap["Snapshots"][0]["Tags"].should.equal([]) + assert resp_auto_snap["Snapshots"][0]["Tags"] == [] @mock_redshift @@ -839,17 +835,17 @@ def test_delete_automated_snapshot(): MasterUserPassword="password", EnhancedVpcRouting=True, ) - cluster_response["Cluster"]["NodeType"].should.equal("ds2.xlarge") + assert cluster_response["Cluster"]["NodeType"] == "ds2.xlarge" resp_auto_snap = client.describe_cluster_snapshots( ClusterIdentifier=cluster_identifier ) snapshot_identifier = resp_auto_snap["Snapshots"][0]["SnapshotIdentifier"] # Delete automated snapshot should result in error - client.delete_cluster_snapshot.when.called_with( - SnapshotIdentifier=snapshot_identifier - ).should.throw( - ClientError, - f"Cannot delete the snapshot {snapshot_identifier} because only manual snapshots may be deleted", + with pytest.raises(ClientError) as client_err: + client.delete_cluster_snapshot(SnapshotIdentifier=snapshot_identifier) + assert client_err.value.response["Error"]["Message"] == ( + f"Cannot delete the snapshot {snapshot_identifier} because only " + "manual snapshots may be deleted" ) @@ -867,18 +863,18 @@ def test_presence_automated_snapshot_on_cluster_delete(): ) # Ensure automated snapshot is available resp = client.describe_cluster_snapshots(ClusterIdentifier=cluster_identifier) - resp["Snapshots"].should.have.length_of(1) + assert len(resp["Snapshots"]) == 1 # Delete the cluster cluster_response = client.delete_cluster( ClusterIdentifier=cluster_identifier, SkipFinalClusterSnapshot=True ) cluster = cluster_response["Cluster"] - cluster["ClusterIdentifier"].should.equal(cluster_identifier) + assert cluster["ClusterIdentifier"] == cluster_identifier # Ensure Automated snapshot is deleted resp = client.describe_cluster_snapshots(ClusterIdentifier=cluster_identifier) - resp["Snapshots"].should.have.length_of(0) + assert len(resp["Snapshots"]) == 0 @mock_redshift @@ -896,7 +892,7 @@ def test_describe_snapshot_with_filter(): MasterUserPassword="password", EnhancedVpcRouting=True, ) - cluster_response["Cluster"]["NodeType"].should.equal("ds2.xlarge") + assert cluster_response["Cluster"]["NodeType"] == "ds2.xlarge" resp_auto_snap = client.describe_cluster_snapshots( ClusterIdentifier=cluster_identifier, SnapshotType="automated" ) @@ -908,30 +904,38 @@ def test_describe_snapshot_with_filter(): resp = client.describe_cluster_snapshots( ClusterIdentifier=cluster_identifier, SnapshotType="automated" ) - resp["Snapshots"].should.have.length_of(1) + assert len(resp["Snapshots"]) == 1 resp = client.describe_cluster_snapshots( ClusterIdentifier=cluster_identifier, SnapshotType="manual" ) - resp["Snapshots"].should.have.length_of(1) + assert len(resp["Snapshots"]) == 1 resp = client.describe_cluster_snapshots( SnapshotIdentifier=snapshot_identifier, SnapshotType="manual" ) - resp["Snapshots"].should.have.length_of(1) + assert len(resp["Snapshots"]) == 1 resp = client.describe_cluster_snapshots( SnapshotIdentifier=auto_snapshot_identifier, SnapshotType="automated" ) - resp["Snapshots"].should.have.length_of(1) + assert len(resp["Snapshots"]) == 1 - client.describe_cluster_snapshots.when.called_with( - SnapshotIdentifier=snapshot_identifier, SnapshotType="automated" - ).should.throw(ClientError, f"Snapshot {snapshot_identifier} not found.") + with pytest.raises(ClientError) as client_err: + client.describe_cluster_snapshots( + SnapshotIdentifier=snapshot_identifier, SnapshotType="automated" + ) + assert client_err.value.response["Error"]["Message"] == ( + f"Snapshot {snapshot_identifier} not found." + ) - client.describe_cluster_snapshots.when.called_with( - SnapshotIdentifier=auto_snapshot_identifier, SnapshotType="manual" - ).should.throw(ClientError, f"Snapshot {auto_snapshot_identifier} not found.") + with pytest.raises(ClientError) as client_err: + client.describe_cluster_snapshots( + SnapshotIdentifier=auto_snapshot_identifier, SnapshotType="manual" + ) + assert client_err.value.response["Error"]["Message"] == ( + f"Snapshot {auto_snapshot_identifier} not found." + ) @mock_redshift @@ -953,30 +957,32 @@ def test_create_cluster_from_automated_snapshot(): ClusterIdentifier=original_cluster_identifier, SnapshotType="automated" ) auto_snapshot_identifier = resp_auto_snap["Snapshots"][0]["SnapshotIdentifier"] - client.restore_from_cluster_snapshot.when.called_with( - ClusterIdentifier=original_cluster_identifier, - SnapshotIdentifier=auto_snapshot_identifier, - ).should.throw(ClientError, "ClusterAlreadyExists") + with pytest.raises(ClientError) as client_err: + client.restore_from_cluster_snapshot( + ClusterIdentifier=original_cluster_identifier, + SnapshotIdentifier=auto_snapshot_identifier, + ) + assert client_err.value.response["Error"]["Code"] == "ClusterAlreadyExists" response = client.restore_from_cluster_snapshot( ClusterIdentifier=new_cluster_identifier, SnapshotIdentifier=auto_snapshot_identifier, Port=1234, ) - response["Cluster"]["ClusterStatus"].should.equal("creating") + assert response["Cluster"]["ClusterStatus"] == "creating" response = client.describe_clusters(ClusterIdentifier=new_cluster_identifier) new_cluster = response["Clusters"][0] - new_cluster["NodeType"].should.equal("ds2.xlarge") - new_cluster["MasterUsername"].should.equal("username") - new_cluster["Endpoint"]["Port"].should.equal(1234) - new_cluster["EnhancedVpcRouting"].should.equal(True) + assert new_cluster["NodeType"] == "ds2.xlarge" + assert new_cluster["MasterUsername"] == "username" + assert new_cluster["Endpoint"]["Port"] == 1234 + assert new_cluster["EnhancedVpcRouting"] is True # Make sure the new cluster has automated snapshot on cluster creation resp_auto_snap = client.describe_cluster_snapshots( ClusterIdentifier=new_cluster_identifier, SnapshotType="automated" ) - resp_auto_snap["Snapshots"].should.have.length_of(1) + assert len(resp_auto_snap["Snapshots"]) == 1 @mock_redshift @@ -994,7 +1000,7 @@ def test_create_cluster_snapshot(): MasterUserPassword="password", EnhancedVpcRouting=True, ) - cluster_response["Cluster"]["NodeType"].should.equal("ds2.xlarge") + assert cluster_response["Cluster"]["NodeType"] == "ds2.xlarge" snapshot_response = client.create_cluster_snapshot( SnapshotIdentifier=snapshot_identifier, @@ -1002,11 +1008,11 @@ def test_create_cluster_snapshot(): Tags=[{"Key": "test-tag-key", "Value": "test-tag-value"}], ) snapshot = snapshot_response["Snapshot"] - snapshot["SnapshotIdentifier"].should.equal(snapshot_identifier) - snapshot["ClusterIdentifier"].should.equal(cluster_identifier) - snapshot["NumberOfNodes"].should.equal(1) - snapshot["NodeType"].should.equal("ds2.xlarge") - snapshot["MasterUsername"].should.equal("username") + assert snapshot["SnapshotIdentifier"] == snapshot_identifier + assert snapshot["ClusterIdentifier"] == cluster_identifier + assert snapshot["NumberOfNodes"] == 1 + assert snapshot["NodeType"] == "ds2.xlarge" + assert snapshot["MasterUsername"] == "username" @mock_redshift @@ -1036,27 +1042,27 @@ def test_describe_cluster_snapshots(): SnapshotIdentifier=snapshot_identifier_1 ) snapshot_1 = resp_snap_1["Snapshots"][0] - snapshot_1["SnapshotIdentifier"].should.equal(snapshot_identifier_1) - snapshot_1["ClusterIdentifier"].should.equal(cluster_identifier) - snapshot_1["NumberOfNodes"].should.equal(1) - snapshot_1["NodeType"].should.equal("ds2.xlarge") - snapshot_1["MasterUsername"].should.equal("username") + assert snapshot_1["SnapshotIdentifier"] == snapshot_identifier_1 + assert snapshot_1["ClusterIdentifier"] == cluster_identifier + assert snapshot_1["NumberOfNodes"] == 1 + assert snapshot_1["NodeType"] == "ds2.xlarge" + assert snapshot_1["MasterUsername"] == "username" resp_snap_2 = client.describe_cluster_snapshots( SnapshotIdentifier=snapshot_identifier_2 ) snapshot_2 = resp_snap_2["Snapshots"][0] - snapshot_2["SnapshotIdentifier"].should.equal(snapshot_identifier_2) - snapshot_2["ClusterIdentifier"].should.equal(cluster_identifier) - snapshot_2["NumberOfNodes"].should.equal(1) - snapshot_2["NodeType"].should.equal("ds2.xlarge") - snapshot_2["MasterUsername"].should.equal("username") + assert snapshot_2["SnapshotIdentifier"] == snapshot_identifier_2 + assert snapshot_2["ClusterIdentifier"] == cluster_identifier + assert snapshot_2["NumberOfNodes"] == 1 + assert snapshot_2["NodeType"] == "ds2.xlarge" + assert snapshot_2["MasterUsername"] == "username" resp_clust = client.describe_cluster_snapshots( ClusterIdentifier=cluster_identifier, SnapshotType="manual" ) - resp_clust["Snapshots"][0].should.equal(resp_snap_1["Snapshots"][0]) - resp_clust["Snapshots"][1].should.equal(resp_snap_2["Snapshots"][0]) + assert resp_clust["Snapshots"][0] == resp_snap_1["Snapshots"][0] + assert resp_clust["Snapshots"][1] == resp_snap_2["Snapshots"][0] @mock_redshift @@ -1066,11 +1072,13 @@ def test_describe_cluster_snapshots_not_found_error(): snapshot_identifier = "non-existent-snapshot-id" resp = client.describe_cluster_snapshots(ClusterIdentifier=cluster_identifier) - resp["Snapshots"].should.have.length_of(0) + assert len(resp["Snapshots"]) == 0 - client.describe_cluster_snapshots.when.called_with( - SnapshotIdentifier=snapshot_identifier - ).should.throw(ClientError, f"Snapshot {snapshot_identifier} not found.") + with pytest.raises(ClientError) as client_err: + client.describe_cluster_snapshots(SnapshotIdentifier=snapshot_identifier) + assert client_err.value.response["Error"]["Message"] == ( + f"Snapshot {snapshot_identifier} not found." + ) @mock_redshift @@ -1091,19 +1099,24 @@ def test_delete_cluster_snapshot(): ) snapshots = client.describe_cluster_snapshots()["Snapshots"] - list(snapshots).should.have.length_of(2) + assert len(list(snapshots)) == 2 - client.delete_cluster_snapshot(SnapshotIdentifier=snapshot_identifier)["Snapshot"][ - "Status" - ].should.equal("deleted") + assert ( + client.delete_cluster_snapshot(SnapshotIdentifier=snapshot_identifier)[ + "Snapshot" + ]["Status"] + == "deleted" + ) snapshots = client.describe_cluster_snapshots()["Snapshots"] - list(snapshots).should.have.length_of(1) + assert len(list(snapshots)) == 1 # Delete invalid id - client.delete_cluster_snapshot.when.called_with( - SnapshotIdentifier="non-existent" - ).should.throw(ClientError, "Snapshot non-existent not found.") + with pytest.raises(ClientError) as client_err: + client.delete_cluster_snapshot(SnapshotIdentifier="non-existent") + assert client_err.value.response["Error"]["Message"] == ( + "Snapshot non-existent not found." + ) @mock_redshift @@ -1125,9 +1138,14 @@ def test_cluster_snapshot_already_exists(): SnapshotIdentifier=snapshot_identifier, ClusterIdentifier=cluster_identifier ) - client.create_cluster_snapshot.when.called_with( - SnapshotIdentifier=snapshot_identifier, ClusterIdentifier=cluster_identifier - ).should.throw(ClientError, f"{snapshot_identifier} already exists") + with pytest.raises(ClientError) as client_err: + client.create_cluster_snapshot( + SnapshotIdentifier=snapshot_identifier, ClusterIdentifier=cluster_identifier + ) + assert ( + f"{snapshot_identifier} already exists" + in client_err.value.response["Error"]["Message"] + ) @mock_redshift @@ -1151,24 +1169,26 @@ def test_create_cluster_from_snapshot(): ClusterIdentifier=original_cluster_identifier, ) - client.restore_from_cluster_snapshot.when.called_with( - ClusterIdentifier=original_cluster_identifier, - SnapshotIdentifier=original_snapshot_identifier, - ).should.throw(ClientError, "ClusterAlreadyExists") + with pytest.raises(ClientError) as client_err: + client.restore_from_cluster_snapshot( + ClusterIdentifier=original_cluster_identifier, + SnapshotIdentifier=original_snapshot_identifier, + ) + assert client_err.value.response["Error"]["Code"] == "ClusterAlreadyExists" response = client.restore_from_cluster_snapshot( ClusterIdentifier=new_cluster_identifier, SnapshotIdentifier=original_snapshot_identifier, Port=1234, ) - response["Cluster"]["ClusterStatus"].should.equal("creating") + assert response["Cluster"]["ClusterStatus"] == "creating" response = client.describe_clusters(ClusterIdentifier=new_cluster_identifier) new_cluster = response["Clusters"][0] - new_cluster["NodeType"].should.equal("ds2.xlarge") - new_cluster["MasterUsername"].should.equal("username") - new_cluster["Endpoint"]["Port"].should.equal(1234) - new_cluster["EnhancedVpcRouting"].should.equal(True) + assert new_cluster["NodeType"] == "ds2.xlarge" + assert new_cluster["MasterUsername"] == "username" + assert new_cluster["Endpoint"]["Port"] == 1234 + assert new_cluster["EnhancedVpcRouting"] is True @mock_redshift @@ -1193,10 +1213,12 @@ def test_create_cluster_with_node_type_from_snapshot(): ClusterIdentifier=original_cluster_identifier, ) - client.restore_from_cluster_snapshot.when.called_with( - ClusterIdentifier=original_cluster_identifier, - SnapshotIdentifier=original_snapshot_identifier, - ).should.throw(ClientError, "ClusterAlreadyExists") + with pytest.raises(ClientError) as client_err: + client.restore_from_cluster_snapshot( + ClusterIdentifier=original_cluster_identifier, + SnapshotIdentifier=original_snapshot_identifier, + ) + assert client_err.value.response["Error"]["Code"] == "ClusterAlreadyExists" response = client.restore_from_cluster_snapshot( ClusterIdentifier=new_cluster_identifier, @@ -1204,14 +1226,14 @@ def test_create_cluster_with_node_type_from_snapshot(): NodeType="ra3.xlplus", NumberOfNodes=3, ) - response["Cluster"]["ClusterStatus"].should.equal("creating") + assert response["Cluster"]["ClusterStatus"] == "creating" response = client.describe_clusters(ClusterIdentifier=new_cluster_identifier) new_cluster = response["Clusters"][0] - new_cluster["NodeType"].should.equal("ra3.xlplus") - new_cluster["NumberOfNodes"].should.equal(3) - new_cluster["MasterUsername"].should.equal("username") - new_cluster["EnhancedVpcRouting"].should.equal(True) + assert new_cluster["NodeType"] == "ra3.xlplus" + assert new_cluster["NumberOfNodes"] == 3 + assert new_cluster["MasterUsername"] == "username" + assert new_cluster["EnhancedVpcRouting"] is True @mock_redshift @@ -1238,7 +1260,7 @@ def test_create_cluster_from_snapshot_with_waiter(): SnapshotIdentifier=original_snapshot_identifier, Port=1234, ) - response["Cluster"]["ClusterStatus"].should.equal("creating") + assert response["Cluster"]["ClusterStatus"] == "creating" client.get_waiter("cluster_restored").wait( ClusterIdentifier=new_cluster_identifier, @@ -1247,18 +1269,22 @@ def test_create_cluster_from_snapshot_with_waiter(): response = client.describe_clusters(ClusterIdentifier=new_cluster_identifier) new_cluster = response["Clusters"][0] - new_cluster["NodeType"].should.equal("ds2.xlarge") - new_cluster["MasterUsername"].should.equal("username") - new_cluster["EnhancedVpcRouting"].should.equal(True) - new_cluster["Endpoint"]["Port"].should.equal(1234) + assert new_cluster["NodeType"] == "ds2.xlarge" + assert new_cluster["MasterUsername"] == "username" + assert new_cluster["EnhancedVpcRouting"] is True + assert new_cluster["Endpoint"]["Port"] == 1234 @mock_redshift def test_create_cluster_from_non_existent_snapshot(): client = boto3.client("redshift", region_name="us-east-1") - client.restore_from_cluster_snapshot.when.called_with( - ClusterIdentifier="cluster-id", SnapshotIdentifier="non-existent-snapshot" - ).should.throw(ClientError, "Snapshot non-existent-snapshot not found.") + with pytest.raises(ClientError) as client_err: + client.restore_from_cluster_snapshot( + ClusterIdentifier="cluster-id", SnapshotIdentifier="non-existent-snapshot" + ) + assert client_err.value.response["Error"]["Message"] == ( + "Snapshot non-existent-snapshot not found." + ) @mock_redshift @@ -1273,10 +1299,10 @@ def test_create_cluster_status_update(): MasterUsername="username", MasterUserPassword="password", ) - response["Cluster"]["ClusterStatus"].should.equal("creating") + assert response["Cluster"]["ClusterStatus"] == "creating" response = client.describe_clusters(ClusterIdentifier=cluster_identifier) - response["Clusters"][0]["ClusterStatus"].should.equal("available") + assert response["Clusters"][0]["ClusterStatus"] == "available" @mock_redshift @@ -1287,7 +1313,10 @@ def test_describe_tags_with_resource_type(): f"arn:aws:redshift:us-east-1:{ACCOUNT_ID}:cluster:{cluster_identifier}" ) snapshot_identifier = "my_snapshot" - snapshot_arn = f"arn:aws:redshift:us-east-1:{ACCOUNT_ID}:snapshot:{cluster_identifier}/{snapshot_identifier}" + snapshot_arn = ( + f"arn:aws:redshift:us-east-1:{ACCOUNT_ID}:snapshot" + f":{cluster_identifier}/{snapshot_identifier}" + ) tag_key = "test-tag-key" tag_value = "test-tag-value" @@ -1302,12 +1331,12 @@ def test_describe_tags_with_resource_type(): ) tags_response = client.describe_tags(ResourceType="cluster") tagged_resources = tags_response["TaggedResources"] - list(tagged_resources).should.have.length_of(1) - tagged_resources[0]["ResourceType"].should.equal("cluster") - tagged_resources[0]["ResourceName"].should.equal(cluster_arn) + assert len(list(tagged_resources)) == 1 + assert tagged_resources[0]["ResourceType"] == "cluster" + assert tagged_resources[0]["ResourceName"] == cluster_arn tag = tagged_resources[0]["Tag"] - tag["Key"].should.equal(tag_key) - tag["Value"].should.equal(tag_value) + assert tag["Key"] == tag_key + assert tag["Value"] == tag_value client.create_cluster_snapshot( SnapshotIdentifier=snapshot_identifier, @@ -1316,12 +1345,12 @@ def test_describe_tags_with_resource_type(): ) tags_response = client.describe_tags(ResourceType="snapshot") tagged_resources = tags_response["TaggedResources"] - list(tagged_resources).should.have.length_of(1) - tagged_resources[0]["ResourceType"].should.equal("snapshot") - tagged_resources[0]["ResourceName"].should.equal(snapshot_arn) + assert len(list(tagged_resources)) == 1 + assert tagged_resources[0]["ResourceType"] == "snapshot" + assert tagged_resources[0]["ResourceName"] == snapshot_arn tag = tagged_resources[0]["Tag"] - tag["Key"].should.equal(tag_key) - tag["Value"].should.equal(tag_value) + assert tag["Key"] == tag_key + assert tag["Value"] == tag_value @mock_redshift @@ -1329,9 +1358,12 @@ def test_describe_tags_cannot_specify_resource_type_and_resource_name(): client = boto3.client("redshift", region_name="us-east-1") resource_name = f"arn:aws:redshift:us-east-1:{ACCOUNT_ID}:cluster:cluster-id" resource_type = "cluster" - client.describe_tags.when.called_with( - ResourceName=resource_name, ResourceType=resource_type - ).should.throw(ClientError, "using either an ARN or a resource type") + with pytest.raises(ClientError) as client_err: + client.describe_tags(ResourceName=resource_name, ResourceType=resource_type) + assert ( + "using either an ARN or a resource type" + in client_err.value.response["Error"]["Message"] + ) @mock_redshift @@ -1342,7 +1374,10 @@ def test_describe_tags_with_resource_name(): f"arn:aws:redshift:us-east-1:{ACCOUNT_ID}:cluster:{cluster_identifier}" ) snapshot_identifier = "snapshot-id" - snapshot_arn = f"arn:aws:redshift:us-east-1:{ACCOUNT_ID}:snapshot:{cluster_identifier}/{snapshot_identifier}" + snapshot_arn = ( + f"arn:aws:redshift:us-east-1:{ACCOUNT_ID}:snapshot" + f":{cluster_identifier}/{snapshot_identifier}" + ) tag_key = "test-tag-key" tag_value = "test-tag-value" @@ -1357,12 +1392,12 @@ def test_describe_tags_with_resource_name(): ) tags_response = client.describe_tags(ResourceName=cluster_arn) tagged_resources = tags_response["TaggedResources"] - list(tagged_resources).should.have.length_of(1) - tagged_resources[0]["ResourceType"].should.equal("cluster") - tagged_resources[0]["ResourceName"].should.equal(cluster_arn) + assert len(list(tagged_resources)) == 1 + assert tagged_resources[0]["ResourceType"] == "cluster" + assert tagged_resources[0]["ResourceName"] == cluster_arn tag = tagged_resources[0]["Tag"] - tag["Key"].should.equal(tag_key) - tag["Value"].should.equal(tag_value) + assert tag["Key"] == tag_key + assert tag["Value"] == tag_value client.create_cluster_snapshot( SnapshotIdentifier=snapshot_identifier, @@ -1371,12 +1406,12 @@ def test_describe_tags_with_resource_name(): ) tags_response = client.describe_tags(ResourceName=snapshot_arn) tagged_resources = tags_response["TaggedResources"] - list(tagged_resources).should.have.length_of(1) - tagged_resources[0]["ResourceType"].should.equal("snapshot") - tagged_resources[0]["ResourceName"].should.equal(snapshot_arn) + assert len(list(tagged_resources)) == 1 + assert tagged_resources[0]["ResourceType"] == "snapshot" + assert tagged_resources[0]["ResourceName"] == snapshot_arn tag = tagged_resources[0]["Tag"] - tag["Key"].should.equal(tag_key) - tag["Value"].should.equal(tag_value) + assert tag["Key"] == tag_key + assert tag["Value"] == tag_value @mock_redshift @@ -1405,9 +1440,9 @@ def test_create_tags(): client.create_tags(ResourceName=cluster_arn, Tags=tags) response = client.describe_clusters(ClusterIdentifier=cluster_identifier) cluster = response["Clusters"][0] - list(cluster["Tags"]).should.have.length_of(num_tags) + assert len(list(cluster["Tags"])) == num_tags response = client.describe_tags(ResourceName=cluster_arn) - list(response["TaggedResources"]).should.have.length_of(num_tags) + assert len(list(response["TaggedResources"])) == num_tags @mock_redshift @@ -1439,9 +1474,9 @@ def test_delete_tags(): ) response = client.describe_clusters(ClusterIdentifier=cluster_identifier) cluster = response["Clusters"][0] - list(cluster["Tags"]).should.have.length_of(1) + assert len(list(cluster["Tags"])) == 1 response = client.describe_tags(ResourceName=cluster_arn) - list(response["TaggedResources"]).should.have.length_of(1) + assert len(list(response["TaggedResources"])) == 1 @mock_ec2 @@ -1452,7 +1487,7 @@ def test_describe_tags_all_resource_types(): subnet = ec2.create_subnet(VpcId=vpc.id, CidrBlock="10.0.0.0/24") client = boto3.client("redshift", region_name="us-east-1") response = client.describe_tags() - list(response["TaggedResources"]).should.have.length_of(0) + assert len(list(response["TaggedResources"])) == 0 client.create_cluster_subnet_group( ClusterSubnetGroupName="my_subnet_group", Description="This is my subnet group", @@ -1494,8 +1529,8 @@ def test_describe_tags_all_resource_types(): ] tagged_resources = response["TaggedResources"] returned_types = [resource["ResourceType"] for resource in tagged_resources] - list(tagged_resources).should.have.length_of(len(expected_types)) - set(returned_types).should.equal(set(expected_types)) + assert len(list(tagged_resources)) == len(expected_types) + assert set(returned_types) == set(expected_types) @mock_redshift @@ -1503,21 +1538,28 @@ def test_tagged_resource_not_found_error(): client = boto3.client("redshift", region_name="us-east-1") cluster_arn = "arn:aws:redshift:us-east-1::cluster:fake" - client.describe_tags.when.called_with(ResourceName=cluster_arn).should.throw( - ClientError, "cluster (fake) not found." - ) + with pytest.raises(ClientError) as client_err: + client.describe_tags(ResourceName=cluster_arn) + assert client_err.value.response["Error"]["Message"] == "cluster (fake) not found." snapshot_arn = "arn:aws:redshift:us-east-1::snapshot:cluster-id/snap-id" - client.delete_tags.when.called_with( - ResourceName=snapshot_arn, TagKeys=["test"] - ).should.throw(ClientError, "snapshot (snap-id) not found.") - - client.describe_tags.when.called_with(ResourceType="cluster").should.throw( - ClientError, "resource of type 'cluster' not found." + with pytest.raises(ClientError) as client_err: + client.delete_tags(ResourceName=snapshot_arn, TagKeys=["test"]) + assert ( + client_err.value.response["Error"]["Message"] == "snapshot (snap-id) not found." ) - client.describe_tags.when.called_with(ResourceName="bad:arn").should.throw( - ClientError, "Tagging is not supported for this type of resource" + with pytest.raises(ClientError) as client_err: + client.describe_tags(ResourceType="cluster") + assert client_err.value.response["Error"]["Message"] == ( + "resource of type 'cluster' not found." + ) + + with pytest.raises(ClientError) as client_err: + client.describe_tags(ResourceName="bad:arn") + assert ( + "Tagging is not supported for this type of resource" + in client_err.value.response["Error"]["Message"] ) @@ -1537,10 +1579,10 @@ def test_enable_snapshot_copy(): client.enable_snapshot_copy( ClusterIdentifier="test", DestinationRegion="us-west-2", RetentionPeriod=3 ) - ex.value.response["Error"]["Code"].should.equal("InvalidParameterValue") - ex.value.response["Error"]["Message"].should.contain( + assert ex.value.response["Error"]["Code"] == "InvalidParameterValue" + assert ( "SnapshotCopyGrantName is required for Snapshot Copy on KMS encrypted clusters." - ) + ) in ex.value.response["Error"]["Message"] with pytest.raises(ClientError) as ex: client.enable_snapshot_copy( ClusterIdentifier="test", @@ -1548,8 +1590,8 @@ def test_enable_snapshot_copy(): RetentionPeriod=3, SnapshotCopyGrantName="invalid-us-east-1-to-us-east-1", ) - ex.value.response["Error"]["Code"].should.equal("UnknownSnapshotCopyRegionFault") - ex.value.response["Error"]["Message"].should.contain("Invalid region us-east-1") + assert ex.value.response["Error"]["Code"] == "UnknownSnapshotCopyRegionFault" + assert "Invalid region us-east-1" in ex.value.response["Error"]["Message"] client.enable_snapshot_copy( ClusterIdentifier="test", DestinationRegion="us-west-2", @@ -1558,9 +1600,9 @@ def test_enable_snapshot_copy(): ) response = client.describe_clusters(ClusterIdentifier="test") cluster_snapshot_copy_status = response["Clusters"][0]["ClusterSnapshotCopyStatus"] - cluster_snapshot_copy_status["RetentionPeriod"].should.equal(3) - cluster_snapshot_copy_status["DestinationRegion"].should.equal("us-west-2") - cluster_snapshot_copy_status["SnapshotCopyGrantName"].should.equal( + assert cluster_snapshot_copy_status["RetentionPeriod"] == 3 + assert cluster_snapshot_copy_status["DestinationRegion"] == "us-west-2" + assert cluster_snapshot_copy_status["SnapshotCopyGrantName"] == ( "copy-us-east-1-to-us-west-2" ) @@ -1579,8 +1621,8 @@ def test_enable_snapshot_copy_unencrypted(): client.enable_snapshot_copy(ClusterIdentifier="test", DestinationRegion="us-west-2") response = client.describe_clusters(ClusterIdentifier="test") cluster_snapshot_copy_status = response["Clusters"][0]["ClusterSnapshotCopyStatus"] - cluster_snapshot_copy_status["RetentionPeriod"].should.equal(7) - cluster_snapshot_copy_status["DestinationRegion"].should.equal("us-west-2") + assert cluster_snapshot_copy_status["RetentionPeriod"] == 7 + assert cluster_snapshot_copy_status["DestinationRegion"] == "us-west-2" @mock_redshift @@ -1602,7 +1644,7 @@ def test_disable_snapshot_copy(): ) client.disable_snapshot_copy(ClusterIdentifier="test") response = client.describe_clusters(ClusterIdentifier="test") - response["Clusters"][0].shouldnt.contain("ClusterSnapshotCopyStatus") + assert "ClusterSnapshotCopyStatus" not in response["Clusters"][0] @mock_redshift @@ -1627,7 +1669,7 @@ def test_modify_snapshot_copy_retention_period(): ) response = client.describe_clusters(ClusterIdentifier="test") cluster_snapshot_copy_status = response["Clusters"][0]["ClusterSnapshotCopyStatus"] - cluster_snapshot_copy_status["RetentionPeriod"].should.equal(5) + assert cluster_snapshot_copy_status["RetentionPeriod"] == 5 @mock_redshift @@ -1642,9 +1684,9 @@ def test_create_duplicate_cluster_fails(): } client = boto3.client("redshift", region_name="us-east-1") client.create_cluster(**kwargs) - client.create_cluster.when.called_with(**kwargs).should.throw( - ClientError, "ClusterAlreadyExists" - ) + with pytest.raises(ClientError) as client_err: + client.create_cluster(**kwargs) + assert client_err.value.response["Error"]["Code"] == "ClusterAlreadyExists" @mock_redshift @@ -1653,8 +1695,8 @@ def test_delete_cluster_with_final_snapshot(): with pytest.raises(ClientError) as ex: client.delete_cluster(ClusterIdentifier="non-existent") - ex.value.response["Error"]["Code"].should.equal("ClusterNotFound") - ex.value.response["Error"]["Message"].should.match(r"Cluster .+ not found.") + assert ex.value.response["Error"]["Code"] == "ClusterNotFound" + assert re.match(r"Cluster .+ not found.", ex.value.response["Error"]["Message"]) cluster_identifier = "my_cluster" client.create_cluster( @@ -1670,10 +1712,10 @@ def test_delete_cluster_with_final_snapshot(): client.delete_cluster( ClusterIdentifier=cluster_identifier, SkipFinalClusterSnapshot=False ) - ex.value.response["Error"]["Code"].should.equal("InvalidParameterCombination") - ex.value.response["Error"]["Message"].should.contain( + assert ex.value.response["Error"]["Code"] == "InvalidParameterCombination" + assert ( "FinalClusterSnapshotIdentifier is required unless SkipFinalClusterSnapshot is specified." - ) + ) in ex.value.response["Error"]["Message"] snapshot_identifier = "my_snapshot" client.delete_cluster( @@ -1683,14 +1725,14 @@ def test_delete_cluster_with_final_snapshot(): ) resp = client.describe_cluster_snapshots(ClusterIdentifier=cluster_identifier) - resp["Snapshots"].should.have.length_of(1) - resp["Snapshots"][0]["SnapshotIdentifier"].should.equal(snapshot_identifier) - resp["Snapshots"][0]["SnapshotType"].should.equal("manual") + assert len(resp["Snapshots"]) == 1 + assert resp["Snapshots"][0]["SnapshotIdentifier"] == snapshot_identifier + assert resp["Snapshots"][0]["SnapshotType"] == "manual" with pytest.raises(ClientError) as ex: client.describe_clusters(ClusterIdentifier=cluster_identifier) - ex.value.response["Error"]["Code"].should.equal("ClusterNotFound") - ex.value.response["Error"]["Message"].should.match(r"Cluster .+ not found.") + assert ex.value.response["Error"]["Code"] == "ClusterNotFound" + assert re.match(r"Cluster .+ not found.", ex.value.response["Error"]["Message"]) @mock_redshift @@ -1709,40 +1751,39 @@ def test_delete_cluster_without_final_snapshot(): ClusterIdentifier=cluster_identifier, SkipFinalClusterSnapshot=True ) cluster = cluster_response["Cluster"] - cluster["ClusterIdentifier"].should.equal(cluster_identifier) - cluster["NodeType"].should.equal("ds2.xlarge") + assert cluster["ClusterIdentifier"] == cluster_identifier + assert cluster["NodeType"] == "ds2.xlarge" # Bug: This is what AWS returns - # cluster["ClusterStatus"].should.equal("deleting") - cluster["MasterUsername"].should.equal("user") - cluster["DBName"].should.equal("test") + # assert cluster["ClusterStatus"] == "deleting" + assert cluster["MasterUsername"] == "user" + assert cluster["DBName"] == "test" endpoint = cluster["Endpoint"] - endpoint["Address"].should.match( - f"{cluster_identifier}.[a-z0-9]+.us-east-1.redshift.amazonaws.com" + assert re.match( + f"{cluster_identifier}.[a-z0-9]+.us-east-1.redshift.amazonaws.com", + endpoint["Address"], ) - endpoint["Port"].should.equal(5439) - cluster["AutomatedSnapshotRetentionPeriod"].should.equal(1) - cluster["ClusterParameterGroups"].should.have.length_of(1) + assert endpoint["Port"] == 5439 + assert cluster["AutomatedSnapshotRetentionPeriod"] == 1 + assert len(cluster["ClusterParameterGroups"]) == 1 param_group = cluster["ClusterParameterGroups"][0] - param_group.should.equal( - { - "ParameterGroupName": "default.redshift-1.0", - "ParameterApplyStatus": "in-sync", - } - ) - cluster["AvailabilityZone"].should.equal("us-east-1a") - cluster["ClusterVersion"].should.equal("1.0") - cluster["AllowVersionUpgrade"].should.equal(True) - cluster["NumberOfNodes"].should.equal(1) - cluster["Encrypted"].should.equal(False) - cluster["EnhancedVpcRouting"].should.equal(False) + assert param_group == { + "ParameterGroupName": "default.redshift-1.0", + "ParameterApplyStatus": "in-sync", + } + assert cluster["AvailabilityZone"] == "us-east-1a" + assert cluster["ClusterVersion"] == "1.0" + assert cluster["AllowVersionUpgrade"] is True + assert cluster["NumberOfNodes"] == 1 + assert cluster["Encrypted"] is False + assert cluster["EnhancedVpcRouting"] is False resp = client.describe_cluster_snapshots(ClusterIdentifier=cluster_identifier) - resp["Snapshots"].should.have.length_of(0) + assert len(resp["Snapshots"]) == 0 with pytest.raises(ClientError) as ex: client.describe_clusters(ClusterIdentifier=cluster_identifier) - ex.value.response["Error"]["Code"].should.equal("ClusterNotFound") - ex.value.response["Error"]["Message"].should.match(r"Cluster .+ not found.") + assert ex.value.response["Error"]["Code"] == "ClusterNotFound" + assert re.match(r"Cluster .+ not found.", ex.value.response["Error"]["Message"]) @mock_redshift @@ -1756,26 +1797,26 @@ def test_resize_cluster(): MasterUsername="user", MasterUserPassword="password", ) - resp["Cluster"]["NumberOfNodes"].should.equal(1) + assert resp["Cluster"]["NumberOfNodes"] == 1 client.modify_cluster( ClusterIdentifier="test", ClusterType="multi-node", NumberOfNodes=2 ) resp = client.describe_clusters(ClusterIdentifier="test") - resp["Clusters"][0]["NumberOfNodes"].should.equal(2) + assert resp["Clusters"][0]["NumberOfNodes"] == 2 client.modify_cluster(ClusterIdentifier="test", ClusterType="single-node") resp = client.describe_clusters(ClusterIdentifier="test") - resp["Clusters"][0]["NumberOfNodes"].should.equal(1) + assert resp["Clusters"][0]["NumberOfNodes"] == 1 with pytest.raises(ClientError) as ex: client.modify_cluster( ClusterIdentifier="test", ClusterType="multi-node", NumberOfNodes=1 ) - ex.value.response["Error"]["Code"].should.equal("InvalidParameterCombination") - ex.value.response["Error"]["Message"].should.contain( + assert ex.value.response["Error"]["Code"] == "InvalidParameterCombination" + assert ( "Number of nodes for cluster type multi-node must be greater than or equal to 2" - ) + ) in ex.value.response["Error"]["Message"] with pytest.raises(ClientError) as ex: client.modify_cluster( @@ -1783,8 +1824,8 @@ def test_resize_cluster(): ClusterType="invalid-cluster-type", NumberOfNodes=1, ) - ex.value.response["Error"]["Code"].should.equal("InvalidParameterValue") - ex.value.response["Error"]["Message"].should.contain("Invalid cluster type") + assert ex.value.response["Error"]["Code"] == "InvalidParameterValue" + assert "Invalid cluster type" in ex.value.response["Error"]["Message"] @mock_redshift @@ -1795,8 +1836,8 @@ def test_get_cluster_credentials_non_existent_cluster_and_user(): client.get_cluster_credentials( ClusterIdentifier="non-existent", DbUser="some_user" ) - ex.value.response["Error"]["Code"].should.equal("ClusterNotFound") - ex.value.response["Error"]["Message"].should.match(r"Cluster .+ not found.") + assert ex.value.response["Error"]["Code"] == "ClusterNotFound" + assert re.match(r"Cluster .+ not found.", ex.value.response["Error"]["Message"]) @mock_redshift @@ -1818,18 +1859,20 @@ def test_get_cluster_credentials_invalid_duration(): client.get_cluster_credentials( ClusterIdentifier=cluster_identifier, DbUser=db_user, DurationSeconds=899 ) - ex.value.response["Error"]["Code"].should.equal("InvalidParameterValue") - ex.value.response["Error"]["Message"].should.contain( + assert ex.value.response["Error"]["Code"] == "InvalidParameterValue" + assert ( "Token duration must be between 900 and 3600 seconds" + in ex.value.response["Error"]["Message"] ) with pytest.raises(ClientError) as ex: client.get_cluster_credentials( ClusterIdentifier=cluster_identifier, DbUser=db_user, DurationSeconds=3601 ) - ex.value.response["Error"]["Code"].should.equal("InvalidParameterValue") - ex.value.response["Error"]["Message"].should.contain( + assert ex.value.response["Error"]["Code"] == "InvalidParameterValue" + assert ( "Token duration must be between 900 and 3600 seconds" + in ex.value.response["Error"]["Message"] ) @@ -1848,30 +1891,30 @@ def test_get_cluster_credentials(): ) expected_expiration = time.mktime( - (datetime.datetime.now() + datetime.timedelta(0, 900)).timetuple() + (datetime.datetime.now(tzutc()) + datetime.timedelta(0, 900)).timetuple() ) db_user = "some_user" response = client.get_cluster_credentials( ClusterIdentifier=cluster_identifier, DbUser=db_user ) - response["DbUser"].should.equal(f"IAM:{db_user}") + assert response["DbUser"] == f"IAM:{db_user}" assert time.mktime((response["Expiration"]).timetuple()) == pytest.approx( expected_expiration ) - response["DbPassword"].should.have.length_of(32) + assert len(response["DbPassword"]) == 32 response = client.get_cluster_credentials( ClusterIdentifier=cluster_identifier, DbUser=db_user, AutoCreate=True ) - response["DbUser"].should.equal(f"IAMA:{db_user}") + assert response["DbUser"] == f"IAMA:{db_user}" response = client.get_cluster_credentials( ClusterIdentifier=cluster_identifier, DbUser="some_other_user", AutoCreate=False ) - response["DbUser"].should.equal("IAM:some_other_user") + assert response["DbUser"] == "IAM:some_other_user" expected_expiration = time.mktime( - (datetime.datetime.now() + datetime.timedelta(0, 3000)).timetuple() + (datetime.datetime.now(tzutc()) + datetime.timedelta(0, 3000)).timetuple() ) response = client.get_cluster_credentials( ClusterIdentifier=cluster_identifier, DbUser=db_user, DurationSeconds=3000 @@ -1893,17 +1936,17 @@ def test_pause_cluster(): MasterUserPassword="password", ) cluster = response["Cluster"] - cluster["ClusterIdentifier"].should.equal("test") + assert cluster["ClusterIdentifier"] == "test" response = client.pause_cluster(ClusterIdentifier="test") cluster = response["Cluster"] - cluster["ClusterIdentifier"].should.equal("test") + assert cluster["ClusterIdentifier"] == "test" # Verify this call returns all properties - cluster["NodeType"].should.equal("ds2.xlarge") - cluster["ClusterStatus"].should.equal("paused") - cluster["ClusterVersion"].should.equal("1.0") - cluster["AllowVersionUpgrade"].should.equal(True) - cluster["Endpoint"]["Port"].should.equal(5439) + assert cluster["NodeType"] == "ds2.xlarge" + assert cluster["ClusterStatus"] == "paused" + assert cluster["ClusterVersion"] == "1.0" + assert cluster["AllowVersionUpgrade"] is True + assert cluster["Endpoint"]["Port"] == 5439 @mock_redshift @@ -1913,8 +1956,8 @@ def test_pause_unknown_cluster(): with pytest.raises(ClientError) as exc: client.pause_cluster(ClusterIdentifier="test") err = exc.value.response["Error"] - err["Code"].should.equal("ClusterNotFound") - err["Message"].should.equal("Cluster test not found.") + assert err["Code"] == "ClusterNotFound" + assert err["Message"] == "Cluster test not found." @mock_redshift @@ -1932,13 +1975,13 @@ def test_resume_cluster(): client.pause_cluster(ClusterIdentifier="test") response = client.resume_cluster(ClusterIdentifier="test") cluster = response["Cluster"] - cluster["ClusterIdentifier"].should.equal("test") + assert cluster["ClusterIdentifier"] == "test" # Verify this call returns all properties - cluster["NodeType"].should.equal("ds2.xlarge") - cluster["ClusterStatus"].should.equal("available") - cluster["ClusterVersion"].should.equal("1.0") - cluster["AllowVersionUpgrade"].should.equal(True) - cluster["Endpoint"]["Port"].should.equal(5439) + assert cluster["NodeType"] == "ds2.xlarge" + assert cluster["ClusterStatus"] == "available" + assert cluster["ClusterVersion"] == "1.0" + assert cluster["AllowVersionUpgrade"] is True + assert cluster["Endpoint"]["Port"] == 5439 @mock_redshift @@ -1948,5 +1991,5 @@ def test_resume_unknown_cluster(): with pytest.raises(ClientError) as exc: client.resume_cluster(ClusterIdentifier="test") err = exc.value.response["Error"] - err["Code"].should.equal("ClusterNotFound") - err["Message"].should.equal("Cluster test not found.") + assert err["Code"] == "ClusterNotFound" + assert err["Message"] == "Cluster test not found." diff --git a/tests/test_redshift/test_redshift_cloudformation.py b/tests/test_redshift/test_redshift_cloudformation.py index fdad3093a..837ad9092 100644 --- a/tests/test_redshift/test_redshift_cloudformation.py +++ b/tests/test_redshift/test_redshift_cloudformation.py @@ -1,6 +1,6 @@ -import boto3 import json -import sure # noqa # pylint: disable=unused-import + +import boto3 from moto import mock_cloudformation, mock_ec2, mock_redshift from tests.test_cloudformation.fixtures import redshift @@ -33,20 +33,20 @@ def test_redshift_stack(): cluster_res = redshift_conn.describe_clusters() clusters = cluster_res["Clusters"] - clusters.should.have.length_of(1) + assert len(clusters) == 1 cluster = clusters[0] - cluster["DBName"].should.equal("mydb") - cluster["NumberOfNodes"].should.equal(2) - cluster["NodeType"].should.equal("dw1.xlarge") - cluster["MasterUsername"].should.equal("myuser") - cluster["Endpoint"]["Port"].should.equal(5439) - cluster["VpcSecurityGroups"].should.have.length_of(1) + assert cluster["DBName"] == "mydb" + assert cluster["NumberOfNodes"] == 2 + assert cluster["NodeType"] == "dw1.xlarge" + assert cluster["MasterUsername"] == "myuser" + assert cluster["Endpoint"]["Port"] == 5439 + assert len(cluster["VpcSecurityGroups"]) == 1 security_group_id = cluster["VpcSecurityGroups"][0]["VpcSecurityGroupId"] groups = ec2.describe_security_groups(GroupIds=[security_group_id])[ "SecurityGroups" ] - groups.should.have.length_of(1) + assert len(groups) == 1 group = groups[0] - group["IpPermissions"].should.have.length_of(1) - group["IpPermissions"][0]["IpRanges"][0]["CidrIp"].should.equal("10.0.0.1/16") + assert len(group["IpPermissions"]) == 1 + assert group["IpPermissions"][0]["IpRanges"][0]["CidrIp"] == "10.0.0.1/16" diff --git a/tests/test_redshift/test_server.py b/tests/test_redshift/test_server.py index 463f4e46e..f55ea3cd8 100644 --- a/tests/test_redshift/test_server.py +++ b/tests/test_redshift/test_server.py @@ -1,15 +1,13 @@ -import sure # noqa # pylint: disable=unused-import +"""Test the different server responses.""" import json +import re + import pytest import xmltodict import moto.server as server from moto import mock_redshift -""" -Test the different server responses -""" - @mock_redshift def test_describe_clusters(): @@ -19,7 +17,7 @@ def test_describe_clusters(): res = test_client.get("/?Action=DescribeClusters") result = res.data.decode("utf-8") - result.should.contain("") + assert "" in result @mock_redshift @@ -31,9 +29,9 @@ def test_describe_clusters_with_json_content_type(): result = json.loads(res.data.decode("utf-8")) del result["DescribeClustersResponse"]["ResponseMetadata"] - result.should.equal( - {"DescribeClustersResponse": {"DescribeClustersResult": {"Clusters": []}}} - ) + assert result == { + "DescribeClustersResponse": {"DescribeClustersResult": {"Clusters": []}} + } @pytest.mark.parametrize("is_json", [True, False], ids=["JSON", "XML"]) @@ -61,30 +59,29 @@ def test_create_cluster(is_json): result = xmltodict.parse(result, dict_constructor=dict) del result["CreateClusterResponse"]["ResponseMetadata"] - result.should.have.key("CreateClusterResponse") - result["CreateClusterResponse"].should.have.key("CreateClusterResult") - result["CreateClusterResponse"]["CreateClusterResult"].should.have.key("Cluster") + assert "CreateClusterResponse" in result + assert "CreateClusterResult" in result["CreateClusterResponse"] + assert "Cluster" in result["CreateClusterResponse"]["CreateClusterResult"] result = result["CreateClusterResponse"]["CreateClusterResult"]["Cluster"] - result.should.have.key("MasterUsername").equal("masteruser") - result.should.have.key("MasterUserPassword").equal("****") - result.should.have.key("ClusterVersion").equal("1.0") - result.should.have.key("ClusterSubnetGroupName").equal(None) - result.should.have.key("AvailabilityZone").equal("us-east-1a") - result.should.have.key("ClusterStatus").equal("creating") - result.should.have.key("NumberOfNodes").equal(1 if is_json else "1") - result.should.have.key("PubliclyAccessible").equal(None) - result.should.have.key("Encrypted").equal(None) - result.should.have.key("DBName").equal("dev") - result.should.have.key("NodeType").equal("ds2.xlarge") - result.should.have.key("ClusterIdentifier").equal("examplecluster") - result.should.have.key("Endpoint").should.have.key("Address").match( - "examplecluster.[a-z0-9]+.us-east-1.redshift.amazonaws.com" + assert result["MasterUsername"] == "masteruser" + assert result["MasterUserPassword"] == "****" + assert result["ClusterVersion"] == "1.0" + assert result["ClusterSubnetGroupName"] is None + assert result["AvailabilityZone"] == "us-east-1a" + assert result["ClusterStatus"] == "creating" + assert result["NumberOfNodes"] == (1 if is_json else "1") + assert result["PubliclyAccessible"] is None + assert result["Encrypted"] is None + assert result["DBName"] == "dev" + assert result["NodeType"] == "ds2.xlarge" + assert result["ClusterIdentifier"] == "examplecluster" + assert re.match( + "examplecluster.[a-z0-9]+.us-east-1.redshift.amazonaws.com", + result["Endpoint"]["Address"], ) - result.should.have.key("Endpoint").should.have.key("Port").equal( - 5439 if is_json else "5439" - ) - result.should.have.key("ClusterCreateTime") + assert result["Endpoint"]["Port"] == (5439 if is_json else "5439") + assert "ClusterCreateTime" in result @pytest.mark.parametrize("is_json", [True, False], ids=["JSON", "XML"]) @@ -122,37 +119,34 @@ def test_create_cluster_multiple_params(is_json): result = xmltodict.parse(result, dict_constructor=dict) del result["CreateClusterResponse"]["ResponseMetadata"] - result.should.have.key("CreateClusterResponse") - result["CreateClusterResponse"].should.have.key("CreateClusterResult") - result["CreateClusterResponse"]["CreateClusterResult"].should.have.key("Cluster") + assert "CreateClusterResponse" in result + assert "CreateClusterResult" in result["CreateClusterResponse"] + assert "Cluster" in result["CreateClusterResponse"]["CreateClusterResult"] result = result["CreateClusterResponse"]["CreateClusterResult"]["Cluster"] - result.should.have.key("MasterUsername").equal("masteruser") - result.should.have.key("MasterUserPassword").equal("****") - result.should.have.key("ClusterVersion").equal("2.0") - result.should.have.key("ClusterSubnetGroupName").equal(None) - result.should.have.key("AvailabilityZone").equal("us-east-1a") - result.should.have.key("ClusterStatus").equal("creating") - result.should.have.key("NumberOfNodes").equal(3 if is_json else "3") - result.should.have.key("PubliclyAccessible").equal(None) - result.should.have.key("Encrypted").equal("True") - result.should.have.key("DBName").equal("testdb") - result.should.have.key("NodeType").equal("ds2.xlarge") - result.should.have.key("ClusterIdentifier").equal("examplecluster") - result.should.have.key("Endpoint").should.have.key("Address").match( - "examplecluster.[a-z0-9]+.us-east-1.redshift.amazonaws.com" + assert result["MasterUsername"] == "masteruser" + assert result["MasterUserPassword"] == "****" + assert result["ClusterVersion"] == "2.0" + assert result["ClusterSubnetGroupName"] is None + assert result["AvailabilityZone"] == "us-east-1a" + assert result["ClusterStatus"] == "creating" + assert result["NumberOfNodes"] == (3 if is_json else "3") + assert result["PubliclyAccessible"] is None + assert result["Encrypted"] == "True" + assert result["DBName"] == "testdb" + assert result["NodeType"] == "ds2.xlarge" + assert result["ClusterIdentifier"] == "examplecluster" + assert re.match( + "examplecluster.[a-z0-9]+.us-east-1.redshift.amazonaws.com", + result["Endpoint"]["Address"], ) - result.should.have.key("Endpoint").should.have.key("Port").equal( - 1234 if is_json else "1234" - ) - result.should.have.key("ClusterCreateTime") - result.should.have.key("Tags") + assert result["Endpoint"]["Port"] == (1234 if is_json else "1234") + assert "ClusterCreateTime" in result + assert "Tags" in result tags = result["Tags"] if not is_json: tags = tags["item"] - tags.should.equal( - [{"Key": "key1", "Value": "val1"}, {"Key": "key2", "Value": "val2"}] - ) + assert tags == [{"Key": "key1", "Value": "val1"}, {"Key": "key2", "Value": "val2"}] @pytest.mark.parametrize("is_json", [True, False], ids=["JSON", "XML"]) @@ -185,18 +179,16 @@ def test_create_and_describe_clusters(is_json): result = xmltodict.parse(result, dict_constructor=dict) del result["DescribeClustersResponse"]["ResponseMetadata"] - result.should.have.key("DescribeClustersResponse") - result["DescribeClustersResponse"].should.have.key("DescribeClustersResult") - result["DescribeClustersResponse"]["DescribeClustersResult"].should.have.key( - "Clusters" - ) + assert "DescribeClustersResponse" in result + assert "DescribeClustersResult" in result["DescribeClustersResponse"] + assert "Clusters" in result["DescribeClustersResponse"]["DescribeClustersResult"] result = result["DescribeClustersResponse"]["DescribeClustersResult"]["Clusters"] if not is_json: result = result["item"] - result.should.have.length_of(2) + assert len(result) == 2 for cluster in result: - cluster_names.should.contain(cluster["ClusterIdentifier"]) + assert cluster["ClusterIdentifier"] in cluster_names @pytest.mark.parametrize("is_json", [True, False], ids=["JSON", "XML"]) @@ -233,12 +225,12 @@ def test_create_and_describe_cluster_security_group(is_json): groups = groups["item"] descriptions = [g["Description"] for g in groups] - descriptions.should.contain("desc for csg1") - descriptions.should.contain("desc for csg2") + assert "desc for csg1" in descriptions + assert "desc for csg2" in descriptions # Describe single SG describe_params = ( - "/?Action=DescribeClusterSecurityGroups" "&ClusterSecurityGroupName=csg1" + "/?Action=DescribeClusterSecurityGroups&ClusterSecurityGroupName=csg1" ) if is_json: describe_params += "&ContentType=JSON" @@ -255,10 +247,10 @@ def test_create_and_describe_cluster_security_group(is_json): ]["ClusterSecurityGroups"] if is_json: - groups.should.have.length_of(1) - groups[0]["ClusterSecurityGroupName"].should.equal("csg1") + assert len(groups) == 1 + assert groups[0]["ClusterSecurityGroupName"] == "csg1" else: - groups["item"]["ClusterSecurityGroupName"].should.equal("csg1") + assert groups["item"]["ClusterSecurityGroupName"] == "csg1" @pytest.mark.parametrize("is_json", [True, False], ids=["JSON", "XML"]) @@ -268,13 +260,13 @@ def test_describe_unknown_cluster_security_group(is_json): test_client = backend.test_client() describe_params = ( - "/?Action=DescribeClusterSecurityGroups" "&ClusterSecurityGroupName=unknown" + "/?Action=DescribeClusterSecurityGroups&ClusterSecurityGroupName=unknown" ) if is_json: describe_params += "&ContentType=JSON" res = test_client.get(describe_params) - res.status_code.should.equal(400) + assert res.status_code == 400 if is_json: response = json.loads(res.data.decode("utf-8")) @@ -284,8 +276,8 @@ def test_describe_unknown_cluster_security_group(is_json): ] error = response["Error"] - error["Code"].should.equal("ClusterSecurityGroupNotFound") - error["Message"].should.equal("Security group unknown not found.") + assert error["Code"] == "ClusterSecurityGroupNotFound" + assert error["Message"] == "Security group unknown not found." @pytest.mark.parametrize("is_json", [True, False], ids=["JSON", "XML"]) @@ -312,15 +304,15 @@ def test_create_cluster_with_security_group(is_json): response = xmltodict.parse(response, dict_constructor=dict) del response["CreateClusterSecurityGroupResponse"]["ResponseMetadata"] - response.should.have.key("CreateClusterSecurityGroupResponse") + assert "CreateClusterSecurityGroupResponse" in response response = response["CreateClusterSecurityGroupResponse"] - response.should.have.key("CreateClusterSecurityGroupResult") + assert "CreateClusterSecurityGroupResult" in response result = response["CreateClusterSecurityGroupResult"] - result.should.have.key("ClusterSecurityGroup") + assert "ClusterSecurityGroup" in result sg = result["ClusterSecurityGroup"] - sg.should.have.key("ClusterSecurityGroupName").being.equal(csg) - sg.should.have.key("Description").being.equal("desc for " + csg) - sg.should.have.key("EC2SecurityGroups").being.equal([] if is_json else None) + assert sg["ClusterSecurityGroupName"] == csg + assert sg["Description"] == "desc for " + csg + assert sg["EC2SecurityGroups"] == ([] if is_json else None) # Create Cluster with these security groups create_params = ( @@ -344,17 +336,15 @@ def test_create_cluster_with_security_group(is_json): result = xmltodict.parse(result, dict_constructor=dict) del result["CreateClusterResponse"]["ResponseMetadata"] - result.should.have.key("CreateClusterResponse") - result["CreateClusterResponse"].should.have.key("CreateClusterResult") - result["CreateClusterResponse"]["CreateClusterResult"].should.have.key("Cluster") + assert "CreateClusterResponse" in result + assert "CreateClusterResult" in result["CreateClusterResponse"] + assert "Cluster" in result["CreateClusterResponse"]["CreateClusterResult"] result = result["CreateClusterResponse"]["CreateClusterResult"]["Cluster"] security_groups = result["ClusterSecurityGroups"] if not is_json: security_groups = security_groups["item"] - security_groups.should.have.length_of(2) + assert len(security_groups) == 2 for csg in security_group_names: - security_groups.should.contain( - {"ClusterSecurityGroupName": csg, "Status": "active"} - ) + assert {"ClusterSecurityGroupName": csg, "Status": "active"} in security_groups