diff --git a/moto/apigateway/models.py b/moto/apigateway/models.py
index a8db6431e..7e90765a2 100644
--- a/moto/apigateway/models.py
+++ b/moto/apigateway/models.py
@@ -1,14 +1,15 @@
-import string
-import re
-import responses
-import requests
-import time
-from datetime import datetime
from collections import defaultdict
-from openapi_spec_validator import validate_spec
+from datetime import datetime
+import re
+import string
+import time
from typing import Any, Dict, List, Optional, Tuple, Union
from urllib.parse import urlparse
+from openapi_spec_validator import validate_spec
+import requests
+import responses
+
try:
from openapi_spec_validator.validation.exceptions import OpenAPIValidationError
except ImportError:
@@ -1511,7 +1512,7 @@ class APIGatewayBackend(BaseBackend):
integrationHttpMethod="GET"
)
deploy_url = f"https://{api_id}.execute-api.us-east-1.amazonaws.com/dev"
- requests.get(deploy_url).content.should.equal(b"a fake response")
+ assert requests.get(deploy_url).content == b"a fake response"
Limitations:
- Integrations of type HTTP are supported
diff --git a/moto/athena/models.py b/moto/athena/models.py
index 09247a034..4764116fd 100644
--- a/moto/athena/models.py
+++ b/moto/athena/models.py
@@ -1,12 +1,13 @@
-import time
from datetime import datetime
+import time
+from typing import Any, Dict, List, Optional
+
from moto.core import BaseBackend, BackendDict, BaseModel
from moto.moto_api._internal import mock_random
from moto.utilities.paginator import paginate
-from typing import Any, Dict, List, Optional
-class TaggableResourceMixin(object):
+class TaggableResourceMixin:
# This mixing was copied from Redshift when initially implementing
# Athena. TBD if it's worth the overhead.
@@ -263,7 +264,7 @@ class AthenaBackend(BaseBackend):
"http://motoapi.amazonaws.com:5000/moto-api/static/athena/query-results",
json=expected_results,
)
- resp.status_code.should.equal(201)
+ assert resp.status_code == 201
client = boto3.client("athena", region_name="us-east-1")
details = client.get_query_execution(QueryExecutionId="any_id")["QueryExecution"]
diff --git a/moto/rdsdata/models.py b/moto/rdsdata/models.py
index bb85a6052..45964f8a2 100644
--- a/moto/rdsdata/models.py
+++ b/moto/rdsdata/models.py
@@ -1,7 +1,7 @@
-from moto.core import BaseBackend, BackendDict
-
from typing import Any, Dict, List, Optional, Tuple
+from moto.core import BaseBackend, BackendDict
+
class QueryResults:
def __init__(
@@ -66,7 +66,7 @@ class RDSDataServiceBackend(BaseBackend):
"http://motoapi.amazonaws.com:5000/moto-api/static/rds-data/statement-results",
json=expected_results,
)
- resp.status_code.should.equal(201)
+ assert resp.status_code == 201
rdsdata = boto3.client("rds-data", region_name="us-east-1")
resp = rdsdata.execute_statement(resourceArn="not applicable", secretArn="not applicable", sql="SELECT some FROM thing")
diff --git a/moto/redshift/models.py b/moto/redshift/models.py
index 9e6d1fe70..ff31fc59a 100644
--- a/moto/redshift/models.py
+++ b/moto/redshift/models.py
@@ -1,8 +1,10 @@
+from collections import OrderedDict
import copy
import datetime
-
-from collections import OrderedDict
from typing import Any, Dict, Iterable, List, Optional
+
+from dateutil.tz import tzutc
+
from moto.core import BaseBackend, BackendDict, BaseModel, CloudFormationModel
from moto.core.utils import iso_8601_datetime_with_milliseconds
from moto.ec2 import ec2_backends
@@ -46,7 +48,10 @@ class TaggableResourceMixin:
@property
def arn(self) -> str:
- return f"arn:aws:redshift:{self.region}:{self.account_id}:{self.resource_type}:{self.resource_id}"
+ return (
+ f"arn:aws:redshift:{self.region}:{self.account_id}"
+ f":{self.resource_type}:{self.resource_id}"
+ )
def create_tags(self, tags: List[Dict[str, str]]) -> List[Dict[str, str]]:
new_keys = [tag_set["Key"] for tag_set in tags]
@@ -95,7 +100,7 @@ class Cluster(TaggableResourceMixin, CloudFormationModel):
self.redshift_backend = redshift_backend
self.cluster_identifier = cluster_identifier
self.create_time = iso_8601_datetime_with_milliseconds(
- datetime.datetime.utcnow()
+ datetime.datetime.now(tzutc())
)
self.status = "available"
self.node_type = node_type
@@ -220,7 +225,7 @@ class Cluster(TaggableResourceMixin, CloudFormationModel):
if attribute_name == "Endpoint.Address":
return self.endpoint
- elif attribute_name == "Endpoint.Port":
+ if attribute_name == "Endpoint.Port":
return self.port
raise UnformattedGetAttTemplateException()
@@ -528,7 +533,7 @@ class Snapshot(TaggableResourceMixin, BaseModel):
self.snapshot_type = snapshot_type
self.status = "available"
self.create_time = iso_8601_datetime_with_milliseconds(
- datetime.datetime.utcnow()
+ datetime.datetime.now(tzutc())
)
self.iam_roles_arn = iam_roles_arn or []
@@ -622,8 +627,7 @@ class RedshiftBackend(BaseBackend):
}
cluster.cluster_snapshot_copy_status = status
return cluster
- else:
- raise SnapshotCopyAlreadyEnabledFaultError(cluster_identifier)
+ raise SnapshotCopyAlreadyEnabledFaultError(cluster_identifier)
def disable_snapshot_copy(self, **kwargs: Any) -> Cluster:
cluster_identifier = kwargs["cluster_identifier"]
@@ -631,8 +635,7 @@ class RedshiftBackend(BaseBackend):
if cluster.cluster_snapshot_copy_status is not None:
cluster.cluster_snapshot_copy_status = None
return cluster
- else:
- raise SnapshotCopyAlreadyDisabledFaultError(cluster_identifier)
+ raise SnapshotCopyAlreadyDisabledFaultError(cluster_identifier)
def modify_snapshot_copy_retention_period(
self, cluster_identifier: str, retention_period: str
@@ -650,7 +653,10 @@ class RedshiftBackend(BaseBackend):
raise ClusterAlreadyExistsFaultError()
cluster = Cluster(self, **cluster_kwargs)
self.clusters[cluster_identifier] = cluster
- snapshot_id = f"rs:{cluster_identifier}-{datetime.datetime.utcnow().strftime('%Y-%m-%d-%H-%M')}"
+ snapshot_id = (
+ f"rs:{cluster_identifier}-"
+ f"{datetime.datetime.now(tzutc()).strftime('%Y-%m-%d-%H-%M')}"
+ )
# Automated snapshots don't copy over the tags
self.create_cluster_snapshot(
cluster_identifier,
@@ -679,8 +685,7 @@ class RedshiftBackend(BaseBackend):
if cluster_identifier:
if cluster_identifier in self.clusters:
return [self.clusters[cluster_identifier]]
- else:
- raise ClusterNotFoundError(cluster_identifier)
+ raise ClusterNotFoundError(cluster_identifier)
return list(self.clusters.values())
def modify_cluster(self, **cluster_kwargs: Any) -> Cluster:
@@ -739,9 +744,10 @@ class RedshiftBackend(BaseBackend):
and cluster_snapshot_identifer is None
):
raise InvalidParameterCombinationError(
- "FinalClusterSnapshotIdentifier is required unless SkipFinalClusterSnapshot is specified."
+ "FinalClusterSnapshotIdentifier is required unless "
+ "SkipFinalClusterSnapshot is specified."
)
- elif (
+ if (
cluster_skip_final_snapshot is False
and cluster_snapshot_identifer is not None
): # create snapshot
@@ -781,8 +787,7 @@ class RedshiftBackend(BaseBackend):
if subnet_identifier:
if subnet_identifier in self.subnet_groups:
return [self.subnet_groups[subnet_identifier]]
- else:
- raise ClusterSubnetGroupNotFoundError(subnet_identifier)
+ raise ClusterSubnetGroupNotFoundError(subnet_identifier)
return list(self.subnet_groups.values())
def delete_cluster_subnet_group(self, subnet_identifier: str) -> SubnetGroup:
@@ -812,8 +817,7 @@ class RedshiftBackend(BaseBackend):
if security_group_name:
if security_group_name in self.security_groups:
return [self.security_groups[security_group_name]]
- else:
- raise ClusterSecurityGroupNotFoundError(security_group_name)
+ raise ClusterSecurityGroupNotFoundError(security_group_name)
return list(self.security_groups.values())
def delete_cluster_security_group(
@@ -861,8 +865,7 @@ class RedshiftBackend(BaseBackend):
if parameter_group_name:
if parameter_group_name in self.parameter_groups:
return [self.parameter_groups[parameter_group_name]]
- else:
- raise ClusterParameterGroupNotFoundError(parameter_group_name)
+ raise ClusterParameterGroupNotFoundError(parameter_group_name)
return list(self.parameter_groups.values())
def delete_cluster_parameter_group(
@@ -983,8 +986,7 @@ class RedshiftBackend(BaseBackend):
if snapshot_copy_grant_name:
if snapshot_copy_grant_name in self.snapshot_copy_grants:
return [self.snapshot_copy_grants[snapshot_copy_grant_name]]
- else:
- raise SnapshotCopyGrantNotFoundFaultError(snapshot_copy_grant_name)
+ raise SnapshotCopyGrantNotFoundFaultError(snapshot_copy_grant_name)
return copy_grants
def _get_resource_from_arn(self, arn: str) -> TaggableResourceMixin:
@@ -1000,16 +1002,16 @@ class RedshiftBackend(BaseBackend):
resources = self.RESOURCE_TYPE_MAP.get(resource_type)
if resources is None:
message = (
- f"Tagging is not supported for this type of resource: '{resource_type}' "
- "(the ARN is potentially malformed, please check the ARN documentation for more information)"
+ "Tagging is not supported for this type of resource: "
+ f"'{resource_type}' (the ARN is potentially malformed, "
+ "please check the ARN documentation for more information)"
)
raise ResourceNotFoundFaultError(message=message)
try:
resource = resources[resource_id]
except KeyError:
raise ResourceNotFoundFaultError(resource_type, resource_id)
- else:
- return resource
+ return resource
@staticmethod
def _describe_tags_for_resources(resources: Iterable[Any]) -> List[Dict[str, Any]]: # type: ignore[misc]
@@ -1081,7 +1083,7 @@ class RedshiftBackend(BaseBackend):
raise InvalidParameterValueError(
"Token duration must be between 900 and 3600 seconds"
)
- expiration = datetime.datetime.utcnow() + datetime.timedelta(
+ expiration = datetime.datetime.now(tzutc()) + datetime.timedelta(
0, duration_seconds
)
if cluster_identifier in self.clusters:
@@ -1092,8 +1094,7 @@ class RedshiftBackend(BaseBackend):
"DbPassword": mock_random.get_random_string(32),
"Expiration": expiration,
}
- else:
- raise ClusterNotFoundError(cluster_identifier)
+ raise ClusterNotFoundError(cluster_identifier)
redshift_backends = BackendDict(RedshiftBackend, "redshift")
diff --git a/requirements-tests.txt b/requirements-tests.txt
index f01abcfeb..5b1593c16 100644
--- a/requirements-tests.txt
+++ b/requirements-tests.txt
@@ -3,6 +3,5 @@ pytest
pytest-cov
pytest-ordering
pytest-xdist
-surer
freezegun
pylint
diff --git a/tests/test_core/test_importorder.py b/tests/test_core/test_importorder.py
index 526dd90ac..af704aa2c 100644
--- a/tests/test_core/test_importorder.py
+++ b/tests/test_core/test_importorder.py
@@ -1,15 +1,17 @@
+from unittest import SkipTest
+
import boto3
import pytest
+
from moto import mock_s3
from moto import settings
-from unittest import SkipTest
@pytest.fixture(scope="function", name="aws_credentials")
def fixture_aws_credentials(monkeypatch):
+ """Mocked AWS Credentials for moto."""
if settings.TEST_SERVER_MODE:
raise SkipTest("No point in testing this in ServerMode.")
- """Mocked AWS Credentials for moto."""
monkeypatch.setenv("AWS_ACCESS_KEY_ID", "testing")
monkeypatch.setenv("AWS_SECRET_ACCESS_KEY", "testing")
monkeypatch.setenv("AWS_SECURITY_TOKEN", "testing")
@@ -63,8 +65,7 @@ def test_mock_works_with_resource_created_outside(
patch_resource(outside_resource)
- b = list(outside_resource.buckets.all())
- assert b == []
+ assert not list(outside_resource.buckets.all())
m.stop()
@@ -126,7 +127,7 @@ def test_mock_works_when_replacing_client(
try:
logic.do_important_things()
except Exception as e:
- str(e).should.contain("InvalidAccessKeyId")
+ assert str(e) == "InvalidAccessKeyId"
client_initialized_after_mock = boto3.client("s3", region_name="us-east-1")
logic._s3 = client_initialized_after_mock
diff --git a/tests/test_redshift/test_redshift.py b/tests/test_redshift/test_redshift.py
index 9ca7eacc8..2d6baa7a2 100644
--- a/tests/test_redshift/test_redshift.py
+++ b/tests/test_redshift/test_redshift.py
@@ -1,10 +1,11 @@
-import time
import datetime
+import re
+import time
import boto3
from botocore.exceptions import ClientError
+from dateutil.tz import tzutc
import pytest
-import sure # noqa # pylint: disable=unused-import
from moto import mock_ec2
from moto import mock_redshift
@@ -23,38 +24,36 @@ def test_create_cluster_boto3():
MasterUserPassword="password",
)
cluster = response["Cluster"]
- cluster["ClusterIdentifier"].should.equal("test")
- cluster["NodeType"].should.equal("ds2.xlarge")
- cluster["ClusterStatus"].should.equal("creating")
+ assert cluster["ClusterIdentifier"] == "test"
+ assert cluster["NodeType"] == "ds2.xlarge"
+ assert cluster["ClusterStatus"] == "creating"
create_time = cluster["ClusterCreateTime"]
- create_time.should.be.lower_than(datetime.datetime.now(create_time.tzinfo))
- create_time.should.be.greater_than(
+ assert create_time < datetime.datetime.now(create_time.tzinfo)
+ assert create_time > (
datetime.datetime.now(create_time.tzinfo) - datetime.timedelta(minutes=1)
)
- cluster["MasterUsername"].should.equal("user")
- cluster["DBName"].should.equal("test")
- cluster["AutomatedSnapshotRetentionPeriod"].should.equal(1)
- cluster["ClusterSecurityGroups"].should.equal(
- [{"ClusterSecurityGroupName": "Default", "Status": "active"}]
- )
- cluster["VpcSecurityGroups"].should.equal([])
- cluster["ClusterParameterGroups"].should.equal(
- [
- {
- "ParameterGroupName": "default.redshift-1.0",
- "ParameterApplyStatus": "in-sync",
- }
- ]
- )
- cluster["ClusterSubnetGroupName"].should.equal("")
- cluster["AvailabilityZone"].should.equal("us-east-1a")
- cluster["PreferredMaintenanceWindow"].should.equal("Mon:03:00-Mon:03:30")
- cluster["ClusterVersion"].should.equal("1.0")
- cluster["AllowVersionUpgrade"].should.equal(True)
- cluster["NumberOfNodes"].should.equal(1)
- cluster["EnhancedVpcRouting"].should.equal(False)
- cluster["KmsKeyId"].should.equal("")
- cluster["Endpoint"]["Port"].should.equal(5439)
+ assert cluster["MasterUsername"] == "user"
+ assert cluster["DBName"] == "test"
+ assert cluster["AutomatedSnapshotRetentionPeriod"] == 1
+ assert cluster["ClusterSecurityGroups"] == [
+ {"ClusterSecurityGroupName": "Default", "Status": "active"}
+ ]
+ assert cluster["VpcSecurityGroups"] == []
+ assert cluster["ClusterParameterGroups"] == [
+ {
+ "ParameterGroupName": "default.redshift-1.0",
+ "ParameterApplyStatus": "in-sync",
+ }
+ ]
+ assert cluster["ClusterSubnetGroupName"] == ""
+ assert cluster["AvailabilityZone"] == "us-east-1a"
+ assert cluster["PreferredMaintenanceWindow"] == "Mon:03:00-Mon:03:30"
+ assert cluster["ClusterVersion"] == "1.0"
+ assert cluster["AllowVersionUpgrade"] is True
+ assert cluster["NumberOfNodes"] == 1
+ assert cluster["EnhancedVpcRouting"] is False
+ assert cluster["KmsKeyId"] == ""
+ assert cluster["Endpoint"]["Port"] == 5439
@mock_redshift
@@ -69,13 +68,13 @@ def test_create_cluster_with_enhanced_vpc_routing_enabled():
MasterUserPassword="password",
EnhancedVpcRouting=True,
)
- response["Cluster"]["NodeType"].should.equal("ds2.xlarge")
+ assert response["Cluster"]["NodeType"] == "ds2.xlarge"
create_time = response["Cluster"]["ClusterCreateTime"]
- create_time.should.be.lower_than(datetime.datetime.now(create_time.tzinfo))
- create_time.should.be.greater_than(
+ assert create_time < datetime.datetime.now(create_time.tzinfo)
+ assert create_time > (
datetime.datetime.now(create_time.tzinfo) - datetime.timedelta(minutes=1)
)
- response["Cluster"]["EnhancedVpcRouting"].should.equal(True)
+ assert response["Cluster"]["EnhancedVpcRouting"] is True
@mock_redshift
@@ -93,14 +92,14 @@ def test_create_and_describe_cluster_with_kms_key_id():
MasterUserPassword="password",
KmsKeyId=kms_key_id,
)
- response["Cluster"]["KmsKeyId"].should.equal(kms_key_id)
+ assert response["Cluster"]["KmsKeyId"] == kms_key_id
response = client.describe_clusters()
clusters = response.get("Clusters", [])
- len(clusters).should.equal(1)
+ assert len(clusters) == 1
cluster = clusters[0]
- cluster["KmsKeyId"].should.equal(kms_key_id)
+ assert cluster["KmsKeyId"] == kms_key_id
@mock_redshift
@@ -109,14 +108,13 @@ def test_create_snapshot_copy_grant():
grants = client.create_snapshot_copy_grant(
SnapshotCopyGrantName="test-us-east-1", KmsKeyId="fake"
)
- grants["SnapshotCopyGrant"]["SnapshotCopyGrantName"].should.equal("test-us-east-1")
- grants["SnapshotCopyGrant"]["KmsKeyId"].should.equal("fake")
+ assert grants["SnapshotCopyGrant"]["SnapshotCopyGrantName"] == "test-us-east-1"
+ assert grants["SnapshotCopyGrant"]["KmsKeyId"] == "fake"
client.delete_snapshot_copy_grant(SnapshotCopyGrantName="test-us-east-1")
- client.describe_snapshot_copy_grants.when.called_with(
- SnapshotCopyGrantName="test-us-east-1"
- ).should.throw(ClientError)
+ with pytest.raises(ClientError):
+ client.describe_snapshot_copy_grants(SnapshotCopyGrantName="test-us-east-1")
@mock_redshift
@@ -128,14 +126,14 @@ def test_create_many_snapshot_copy_grants():
SnapshotCopyGrantName=f"test-us-east-1-{i}", KmsKeyId="fake"
)
response = client.describe_snapshot_copy_grants()
- len(response["SnapshotCopyGrants"]).should.equal(10)
+ assert len(response["SnapshotCopyGrants"]) == 10
@mock_redshift
def test_no_snapshot_copy_grants():
client = boto3.client("redshift", region_name="us-east-1")
response = client.describe_snapshot_copy_grants()
- len(response["SnapshotCopyGrants"]).should.equal(0)
+ assert len(response["SnapshotCopyGrants"]) == 0
@mock_redshift
@@ -164,64 +162,63 @@ def test_create_cluster_all_attributes():
NumberOfNodes=3,
)
cluster = cluster_response["Cluster"]
- cluster["ClusterIdentifier"].should.equal(cluster_identifier)
- cluster["NodeType"].should.equal("dc1.large")
- cluster["ClusterStatus"].should.equal("creating")
- # cluster["ClusterAvailabilityStatus"].should.equal("Modifying")
- cluster["MasterUsername"].should.equal("username")
- cluster["DBName"].should.equal("my_db")
- cluster["AutomatedSnapshotRetentionPeriod"].should.equal(10)
- # cluster["ManualSnapshotRetentionPeriod"].should.equal(-1)
- # cluster["ClusterSecurityGroups"].should.equal([])
- cluster["ClusterParameterGroups"].should.have.length_of(1)
+ assert cluster["ClusterIdentifier"] == cluster_identifier
+ assert cluster["NodeType"] == "dc1.large"
+ assert cluster["ClusterStatus"] == "creating"
+ # assert cluster["ClusterAvailabilityStatus"] == "Modifying"
+ assert cluster["MasterUsername"] == "username"
+ assert cluster["DBName"] == "my_db"
+ assert cluster["AutomatedSnapshotRetentionPeriod"] == 10
+ # assert cluster["ManualSnapshotRetentionPeriod"] == -1
+ # assert cluster["ClusterSecurityGroups"] == []
+ assert len(cluster["ClusterParameterGroups"]) == 1
param_group = cluster["ClusterParameterGroups"][0]
- param_group.should.equal(
- {
- "ParameterGroupName": "default.redshift-1.0",
- "ParameterApplyStatus": "in-sync",
- }
- )
- # cluster["ClusterSubnetGroupName"].should.equal("default")
- cluster["AvailabilityZone"].should.equal("us-east-1d")
- cluster["PreferredMaintenanceWindow"].should.equal("Mon:03:00-Mon:11:00")
- cluster["ClusterVersion"].should.equal("1.0")
- cluster["AllowVersionUpgrade"].should.equal(True)
- cluster["NumberOfNodes"].should.equal(3)
- # cluster["PubliclyAccessible"].should.equal(True)
- cluster["Encrypted"].should.equal(False)
- cluster["EnhancedVpcRouting"].should.equal(False)
+ assert param_group == {
+ "ParameterGroupName": "default.redshift-1.0",
+ "ParameterApplyStatus": "in-sync",
+ }
+ # assert cluster["ClusterSubnetGroupName"] == "default"
+ assert cluster["AvailabilityZone"] == "us-east-1d"
+ assert cluster["PreferredMaintenanceWindow"] == "Mon:03:00-Mon:11:00"
+ assert cluster["ClusterVersion"] == "1.0"
+ assert cluster["AllowVersionUpgrade"] is True
+ assert cluster["NumberOfNodes"] == 3
+ # assert cluster["PubliclyAccessible"] is True
+ assert cluster["Encrypted"] is False
+ assert cluster["EnhancedVpcRouting"] is False
cluster_response = client.describe_clusters(ClusterIdentifier=cluster_identifier)
cluster = cluster_response["Clusters"][0]
- cluster["ClusterIdentifier"].should.equal(cluster_identifier)
+ assert cluster["ClusterIdentifier"] == cluster_identifier
# AWS returns 'Available' (upper cased)
- cluster["ClusterStatus"].should.equal("available")
- # cluster["ClusterAvailabilityStatus"].should.equal("Available")
- cluster["NodeType"].should.equal("dc1.large")
- cluster["MasterUsername"].should.equal("username")
- cluster["DBName"].should.equal("my_db")
+ assert cluster["ClusterStatus"] == "available"
+ # assert cluster["ClusterAvailabilityStatus"] == "Available"
+ assert cluster["NodeType"] == "dc1.large"
+ assert cluster["MasterUsername"] == "username"
+ assert cluster["DBName"] == "my_db"
# AWS returns: ClusterSecurityGroups=[]
- # cluster["ClusterSecurityGroups"][0]["ClusterSecurityGroupName"].should.equal("Default")
+ # assert cluster["ClusterSecurityGroups"][0]["ClusterSecurityGroupName"] == "Default"
# AWS returns default sg: [{'VpcSecurityGroupId': 'sg-...', 'Status': 'active'}],
- # cluster["VpcSecurityGroups"].should.equal([])
- # cluster["ClusterSubnetGroupName"].should.equal("default")
+ # assert cluster["VpcSecurityGroups"] == []
+ # assert cluster["ClusterSubnetGroupName"] == "default"
# AWS returns default VPC ID
- # cluster["VpcId"].should.equal("vpc-...")
- cluster["AvailabilityZone"].should.equal("us-east-1d")
- cluster["PreferredMaintenanceWindow"].should.equal("Mon:03:00-Mon:11:00")
- cluster["ClusterParameterGroups"][0]["ParameterGroupName"].should.equal(
+ # assert cluster["VpcId"] == "vpc-..."
+ assert cluster["AvailabilityZone"] == "us-east-1d"
+ assert cluster["PreferredMaintenanceWindow"] == "Mon:03:00-Mon:11:00"
+ assert cluster["ClusterParameterGroups"][0]["ParameterGroupName"] == (
"default.redshift-1.0"
)
- cluster["AutomatedSnapshotRetentionPeriod"].should.equal(10)
+ assert cluster["AutomatedSnapshotRetentionPeriod"] == 10
# Endpoint only returned when ClusterStatus=Available
- cluster["Endpoint"]["Address"].should.match(
- f"{cluster_identifier}.[a-z0-9]+.{region}.redshift.amazonaws.com"
+ assert re.match(
+ f"{cluster_identifier}.[a-z0-9]+.{region}.redshift.amazonaws.com",
+ cluster["Endpoint"]["Address"],
)
- cluster["Endpoint"]["Port"].should.equal(1234)
- cluster["ClusterVersion"].should.equal("1.0")
- cluster["AllowVersionUpgrade"].should.equal(True)
- cluster["NumberOfNodes"].should.equal(3)
+ assert cluster["Endpoint"]["Port"] == 1234
+ assert cluster["ClusterVersion"] == "1.0"
+ assert cluster["AllowVersionUpgrade"] is True
+ assert cluster["NumberOfNodes"] == 3
@mock_redshift
@@ -237,17 +234,17 @@ def test_create_single_node_cluster_boto3():
DBName="my_db",
ClusterType="single-node",
)["Cluster"]
- cluster["ClusterIdentifier"].should.equal(cluster_identifier)
- cluster["NumberOfNodes"].should.equal(1)
+ assert cluster["ClusterIdentifier"] == cluster_identifier
+ assert cluster["NumberOfNodes"] == 1
cluster_response = client.describe_clusters(ClusterIdentifier=cluster_identifier)
cluster = cluster_response["Clusters"][0]
- cluster["ClusterIdentifier"].should.equal(cluster_identifier)
- cluster["NodeType"].should.equal("dw.hs1.xlarge")
- cluster["MasterUsername"].should.equal("username")
- cluster["DBName"].should.equal("my_db")
- cluster["NumberOfNodes"].should.equal(1)
+ assert cluster["ClusterIdentifier"] == cluster_identifier
+ assert cluster["NodeType"] == "dw.hs1.xlarge"
+ assert cluster["MasterUsername"] == "username"
+ assert cluster["DBName"] == "my_db"
+ assert cluster["NumberOfNodes"] == 1
@mock_redshift
@@ -273,7 +270,7 @@ def test_create_cluster_in_subnet_group():
cluster_response = client.describe_clusters(ClusterIdentifier="my_cluster")
cluster = cluster_response["Clusters"][0]
- cluster["ClusterSubnetGroupName"].should.equal("my_subnet_group")
+ assert cluster["ClusterSubnetGroupName"] == "my_subnet_group"
@mock_redshift
@@ -299,7 +296,7 @@ def test_create_cluster_in_subnet_group_boto3():
cluster_response = client.describe_clusters(ClusterIdentifier="my_cluster")
cluster = cluster_response["Clusters"][0]
- cluster["ClusterSubnetGroupName"].should.equal("my_subnet_group")
+ assert cluster["ClusterSubnetGroupName"] == "my_subnet_group"
@mock_redshift
@@ -327,7 +324,7 @@ def test_create_cluster_with_security_group_boto3():
group_names = [
group["ClusterSecurityGroupName"] for group in cluster["ClusterSecurityGroups"]
]
- set(group_names).should.equal({"security_group1", "security_group2"})
+ assert set(group_names) == {"security_group1", "security_group2"}
@mock_redshift
@@ -350,7 +347,7 @@ def test_create_cluster_with_vpc_security_groups_boto3():
response = client.describe_clusters(ClusterIdentifier=cluster_id)
cluster = response["Clusters"][0]
group_ids = [group["VpcSecurityGroupId"] for group in cluster["VpcSecurityGroups"]]
- list(group_ids).should.equal([security_group.id])
+ assert list(group_ids) == [security_group.id]
@mock_redshift
@@ -368,7 +365,7 @@ def test_create_cluster_with_iam_roles():
response = client.describe_clusters(ClusterIdentifier=cluster_id)
cluster = response["Clusters"][0]
iam_roles = [role["IamRoleArn"] for role in cluster["IamRoles"]]
- iam_roles_arn.should.equal(iam_roles)
+ assert iam_roles_arn == iam_roles
@mock_redshift
@@ -380,10 +377,10 @@ def test_create_cluster_with_parameter_group_boto3():
ParameterGroupFamily="redshift-1.0",
Description="This is my group",
)["ClusterParameterGroup"]
- group["ParameterGroupName"].should.equal("my-parameter-group")
- group["ParameterGroupFamily"].should.equal("redshift-1.0")
- group["Description"].should.equal("This is my group")
- group["Tags"].should.equal([])
+ assert group["ParameterGroupName"] == "my-parameter-group"
+ assert group["ParameterGroupFamily"] == "redshift-1.0"
+ assert group["Description"] == "This is my group"
+ assert group["Tags"] == []
cluster = client.create_cluster(
ClusterIdentifier=cluster_id,
@@ -393,19 +390,19 @@ def test_create_cluster_with_parameter_group_boto3():
ClusterType="single-node",
ClusterParameterGroupName="my-parameter-group",
)["Cluster"]
- cluster["ClusterParameterGroups"].should.have.length_of(1)
- cluster["ClusterParameterGroups"][0]["ParameterGroupName"].should.equal(
+ assert len(cluster["ClusterParameterGroups"]) == 1
+ assert cluster["ClusterParameterGroups"][0]["ParameterGroupName"] == (
"my-parameter-group"
)
- cluster["ClusterParameterGroups"][0]["ParameterApplyStatus"].should.equal("in-sync")
+ assert cluster["ClusterParameterGroups"][0]["ParameterApplyStatus"] == "in-sync"
cluster_response = client.describe_clusters(ClusterIdentifier=cluster_id)
cluster = cluster_response["Clusters"][0]
- cluster["ClusterParameterGroups"].should.have.length_of(1)
- cluster["ClusterParameterGroups"][0]["ParameterGroupName"].should.equal(
+ assert len(cluster["ClusterParameterGroups"]) == 1
+ assert cluster["ClusterParameterGroups"][0]["ParameterGroupName"] == (
"my-parameter-group"
)
- cluster["ClusterParameterGroups"][0]["ParameterApplyStatus"].should.equal("in-sync")
+ assert cluster["ClusterParameterGroups"][0]["ParameterApplyStatus"] == "in-sync"
@mock_redshift
@@ -414,8 +411,8 @@ def test_describe_non_existent_cluster_boto3():
with pytest.raises(ClientError) as ex:
client.describe_clusters(ClusterIdentifier="not-a-cluster")
err = ex.value.response["Error"]
- err["Code"].should.equal("ClusterNotFound")
- err["Message"].should.equal("Cluster not-a-cluster not found.")
+ assert err["Code"] == "ClusterNotFound"
+ assert err["Message"] == "Cluster not-a-cluster not found."
@mock_redshift
@@ -434,7 +431,7 @@ def test_modify_cluster_vpc_routing():
cluster_response = client.describe_clusters(ClusterIdentifier=cluster_identifier)
cluster = cluster_response["Clusters"][0]
- cluster["EnhancedVpcRouting"].should.equal(False)
+ assert cluster["EnhancedVpcRouting"] is False
client.create_cluster_security_group(
ClusterSecurityGroupName="security_group", Description="security_group"
@@ -463,14 +460,14 @@ def test_modify_cluster_vpc_routing():
cluster_response = client.describe_clusters(ClusterIdentifier=cluster_identifier)
cluster = cluster_response["Clusters"][0]
- cluster["ClusterIdentifier"].should.equal(cluster_identifier)
- cluster["NodeType"].should.equal("ds2.8xlarge")
- cluster["PreferredMaintenanceWindow"].should.equal("Tue:03:00-Tue:11:00")
- cluster["AutomatedSnapshotRetentionPeriod"].should.equal(7)
- cluster["AllowVersionUpgrade"].should.equal(False)
+ assert cluster["ClusterIdentifier"] == cluster_identifier
+ assert cluster["NodeType"] == "ds2.8xlarge"
+ assert cluster["PreferredMaintenanceWindow"] == "Tue:03:00-Tue:11:00"
+ assert cluster["AutomatedSnapshotRetentionPeriod"] == 7
+ assert cluster["AllowVersionUpgrade"] is False
# This one should remain unmodified.
- cluster["NumberOfNodes"].should.equal(3)
- cluster["EnhancedVpcRouting"].should.equal(True)
+ assert cluster["NumberOfNodes"] == 3
+ assert cluster["EnhancedVpcRouting"] is True
@mock_redshift
@@ -496,7 +493,7 @@ def test_modify_cluster_boto3():
cluster_response = client.describe_clusters(ClusterIdentifier=cluster_identifier)
cluster = cluster_response["Clusters"][0]
- cluster["EnhancedVpcRouting"].should.equal(False)
+ assert cluster["EnhancedVpcRouting"] is False
client.modify_cluster(
ClusterIdentifier=cluster_identifier,
@@ -514,18 +511,18 @@ def test_modify_cluster_boto3():
cluster_response = client.describe_clusters(ClusterIdentifier=cluster_identifier)
cluster = cluster_response["Clusters"][0]
- cluster["ClusterIdentifier"].should.equal(cluster_identifier)
- cluster["NodeType"].should.equal("dw.hs1.xlarge")
- cluster["ClusterSecurityGroups"][0]["ClusterSecurityGroupName"].should.equal(
+ assert cluster["ClusterIdentifier"] == cluster_identifier
+ assert cluster["NodeType"] == "dw.hs1.xlarge"
+ assert cluster["ClusterSecurityGroups"][0]["ClusterSecurityGroupName"] == (
"security_group"
)
- cluster["PreferredMaintenanceWindow"].should.equal("Tue:03:00-Tue:11:00")
- cluster["ClusterParameterGroups"][0]["ParameterGroupName"].should.equal(
+ assert cluster["PreferredMaintenanceWindow"] == "Tue:03:00-Tue:11:00"
+ assert cluster["ClusterParameterGroups"][0]["ParameterGroupName"] == (
"my_parameter_group"
)
- cluster["AutomatedSnapshotRetentionPeriod"].should.equal(7)
- cluster["AllowVersionUpgrade"].should.equal(False)
- cluster["NumberOfNodes"].should.equal(4)
+ assert cluster["AutomatedSnapshotRetentionPeriod"] == 7
+ assert cluster["AllowVersionUpgrade"] is False
+ assert cluster["NumberOfNodes"] == 4
@mock_redshift
@@ -548,10 +545,10 @@ def test_create_cluster_subnet_group():
)
my_subnet = subnets_response["ClusterSubnetGroups"][0]
- my_subnet["ClusterSubnetGroupName"].should.equal("my_subnet_group")
- my_subnet["Description"].should.equal("This is my subnet group")
+ assert my_subnet["ClusterSubnetGroupName"] == "my_subnet_group"
+ assert my_subnet["Description"] == "This is my subnet group"
subnet_ids = [subnet["SubnetIdentifier"] for subnet in my_subnet["Subnets"]]
- set(subnet_ids).should.equal(set([subnet1.id, subnet2.id]))
+ assert set(subnet_ids) == set([subnet1.id, subnet2.id])
@mock_redshift
@@ -617,8 +614,8 @@ def test_create_invalid_cluster_subnet_group_boto3():
SubnetIds=["subnet-1234"],
)
err = ex.value.response["Error"]
- err["Code"].should.equal("InvalidSubnet")
- err["Message"].should.match(r"Subnet \[[a-z0-9-']+\] not found.")
+ assert err["Code"] == "InvalidSubnet"
+ assert re.match(r"Subnet \[[a-z0-9-']+\] not found.", err["Message"])
@mock_redshift
@@ -628,8 +625,8 @@ def test_describe_non_existent_subnet_group_boto3():
with pytest.raises(ClientError) as ex:
client.describe_cluster_subnet_groups(ClusterSubnetGroupName="my_subnet")
err = ex.value.response["Error"]
- err["Code"].should.equal("ClusterSubnetGroupNotFound")
- err["Message"].should.equal("Subnet group my_subnet not found.")
+ assert err["Code"] == "ClusterSubnetGroupNotFound"
+ assert err["Message"] == "Subnet group my_subnet not found."
@mock_redshift
@@ -648,18 +645,17 @@ def test_delete_cluster_subnet_group():
subnets_response = client.describe_cluster_subnet_groups()
subnets = subnets_response["ClusterSubnetGroups"]
- subnets.should.have.length_of(1)
+ assert len(subnets) == 1
client.delete_cluster_subnet_group(ClusterSubnetGroupName="my_subnet_group")
subnets_response = client.describe_cluster_subnet_groups()
subnets = subnets_response["ClusterSubnetGroups"]
- subnets.should.have.length_of(0)
+ assert len(subnets) == 0
# Delete invalid id
- client.delete_cluster_subnet_group.when.called_with(
- ClusterSubnetGroupName="not-a-subnet-group"
- ).should.throw(ClientError)
+ with pytest.raises(ClientError):
+ client.delete_cluster_subnet_group(ClusterSubnetGroupName="not-a-subnet-group")
@mock_redshift
@@ -670,22 +666,22 @@ def test_create_cluster_security_group_boto3():
Description="This is my security group",
Tags=[{"Key": "tag_key", "Value": "tag_value"}],
)["ClusterSecurityGroup"]
- group["ClusterSecurityGroupName"].should.equal("my_security_group")
- group["Description"].should.equal("This is my security group")
- group["EC2SecurityGroups"].should.equal([])
- group["IPRanges"].should.equal([])
- group["Tags"].should.equal([{"Key": "tag_key", "Value": "tag_value"}])
+ assert group["ClusterSecurityGroupName"] == "my_security_group"
+ assert group["Description"] == "This is my security group"
+ assert group["EC2SecurityGroups"] == []
+ assert group["IPRanges"] == []
+ assert group["Tags"] == [{"Key": "tag_key", "Value": "tag_value"}]
groups_response = client.describe_cluster_security_groups(
ClusterSecurityGroupName="my_security_group"
)
my_group = groups_response["ClusterSecurityGroups"][0]
- my_group["ClusterSecurityGroupName"].should.equal("my_security_group")
- my_group["Description"].should.equal("This is my security group")
- my_group["EC2SecurityGroups"].should.equal([])
- my_group["IPRanges"].should.equal([])
- my_group["Tags"].should.equal([{"Key": "tag_key", "Value": "tag_value"}])
+ assert my_group["ClusterSecurityGroupName"] == "my_security_group"
+ assert my_group["Description"] == "This is my security group"
+ assert my_group["EC2SecurityGroups"] == []
+ assert my_group["IPRanges"] == []
+ assert my_group["Tags"] == [{"Key": "tag_key", "Value": "tag_value"}]
@mock_redshift
@@ -695,8 +691,8 @@ def test_describe_non_existent_security_group_boto3():
with pytest.raises(ClientError) as ex:
client.describe_cluster_security_groups(ClusterSecurityGroupName="non-existent")
err = ex.value.response["Error"]
- err["Code"].should.equal("ClusterSecurityGroupNotFound")
- err["Message"].should.equal("Security group non-existent not found.")
+ assert err["Code"] == "ClusterSecurityGroupNotFound"
+ assert err["Message"] == "Security group non-existent not found."
@mock_redshift
@@ -708,12 +704,12 @@ def test_delete_cluster_security_group_boto3():
)
groups = client.describe_cluster_security_groups()["ClusterSecurityGroups"]
- groups.should.have.length_of(2) # The default group already exists
+ assert len(groups) == 2 # The default group already exists
client.delete_cluster_security_group(ClusterSecurityGroupName="my_security_group")
groups = client.describe_cluster_security_groups()["ClusterSecurityGroups"]
- groups.should.have.length_of(1)
+ assert len(groups) == 1
# Delete invalid id
with pytest.raises(ClientError) as ex:
@@ -721,8 +717,8 @@ def test_delete_cluster_security_group_boto3():
ClusterSecurityGroupName="not-a-security-group"
)
err = ex.value.response["Error"]
- err["Code"].should.equal("ClusterSecurityGroupNotFound")
- err["Message"].should.equal("Security group not-a-security-group not found.")
+ assert err["Code"] == "ClusterSecurityGroupNotFound"
+ assert err["Message"] == "Security group not-a-security-group not found."
@mock_redshift
@@ -733,19 +729,19 @@ def test_create_cluster_parameter_group_boto3():
ParameterGroupFamily="redshift-1.0",
Description="This is my group",
)["ClusterParameterGroup"]
- group["ParameterGroupName"].should.equal("my-parameter-group")
- group["ParameterGroupFamily"].should.equal("redshift-1.0")
- group["Description"].should.equal("This is my group")
- group["Tags"].should.equal([])
+ assert group["ParameterGroupName"] == "my-parameter-group"
+ assert group["ParameterGroupFamily"] == "redshift-1.0"
+ assert group["Description"] == "This is my group"
+ assert group["Tags"] == []
groups_response = client.describe_cluster_parameter_groups(
ParameterGroupName="my-parameter-group"
)
my_group = groups_response["ParameterGroups"][0]
- my_group["ParameterGroupName"].should.equal("my-parameter-group")
- my_group["ParameterGroupFamily"].should.equal("redshift-1.0")
- my_group["Description"].should.equal("This is my group")
+ assert my_group["ParameterGroupName"] == "my-parameter-group"
+ assert my_group["ParameterGroupFamily"] == "redshift-1.0"
+ assert my_group["Description"] == "This is my group"
@mock_redshift
@@ -756,8 +752,8 @@ def test_describe_non_existent_parameter_group_boto3():
ParameterGroupName="not-a-parameter-group"
)
err = ex.value.response["Error"]
- err["Code"].should.equal("ClusterParameterGroupNotFound")
- err["Message"].should.equal("Parameter group not-a-parameter-group not found.")
+ assert err["Code"] == "ClusterParameterGroupNotFound"
+ assert err["Message"] == "Parameter group not-a-parameter-group not found."
@mock_redshift
@@ -768,34 +764,34 @@ def test_delete_parameter_group_boto3():
ParameterGroupFamily="redshift-1.0",
Description="This is my group",
)
- client.describe_cluster_parameter_groups()["ParameterGroups"].should.have.length_of(
- 2
- )
+ assert len(client.describe_cluster_parameter_groups()["ParameterGroups"]) == 2
x = client.delete_cluster_parameter_group(ParameterGroupName="my-parameter-group")
del x["ResponseMetadata"]
- x.should.equal({})
+ assert x == {}
with pytest.raises(ClientError) as ex:
client.delete_cluster_parameter_group(ParameterGroupName="my-parameter-group")
err = ex.value.response["Error"]
- err["Code"].should.equal("ClusterParameterGroupNotFound")
+ assert err["Code"] == "ClusterParameterGroupNotFound"
# BUG: This is what AWS returns
- # err["Message"].should.equal("ParameterGroup not found: my-parameter-group")
- err["Message"].should.equal("Parameter group my-parameter-group not found.")
+ # assert err["Message"] == "ParameterGroup not found: my-parameter-group"
+ assert err["Message"] == "Parameter group my-parameter-group not found."
- client.describe_cluster_parameter_groups()["ParameterGroups"].should.have.length_of(
- 1
- )
+ assert len(client.describe_cluster_parameter_groups()["ParameterGroups"]) == 1
@mock_redshift
def test_create_cluster_snapshot_of_non_existent_cluster():
client = boto3.client("redshift", region_name="us-east-1")
cluster_identifier = "non-existent-cluster-id"
- client.create_cluster_snapshot.when.called_with(
- SnapshotIdentifier="snapshot-id", ClusterIdentifier=cluster_identifier
- ).should.throw(ClientError, f"Cluster {cluster_identifier} not found.")
+ with pytest.raises(ClientError) as client_err:
+ client.create_cluster_snapshot(
+ SnapshotIdentifier="snapshot-id", ClusterIdentifier=cluster_identifier
+ )
+ assert client_err.value.response["Error"]["Message"] == (
+ f"Cluster {cluster_identifier} not found."
+ )
@mock_redshift
@@ -814,15 +810,15 @@ def test_automated_snapshot_on_cluster_creation():
Tags=[{"Key": "tag_key", "Value": "tag_value"}],
)
- cluster_response["Cluster"]["Tags"].should.equal(
- [{"Key": "tag_key", "Value": "tag_value"}]
- )
+ assert cluster_response["Cluster"]["Tags"] == [
+ {"Key": "tag_key", "Value": "tag_value"}
+ ]
resp_auto_snap = client.describe_cluster_snapshots(
ClusterIdentifier=cluster_identifier
)
- resp_auto_snap["Snapshots"][0]["SnapshotType"].should.equal("automated")
+ assert resp_auto_snap["Snapshots"][0]["SnapshotType"] == "automated"
# Tags from cluster are not copied over to automated snapshot
- resp_auto_snap["Snapshots"][0]["Tags"].should.equal([])
+ assert resp_auto_snap["Snapshots"][0]["Tags"] == []
@mock_redshift
@@ -839,17 +835,17 @@ def test_delete_automated_snapshot():
MasterUserPassword="password",
EnhancedVpcRouting=True,
)
- cluster_response["Cluster"]["NodeType"].should.equal("ds2.xlarge")
+ assert cluster_response["Cluster"]["NodeType"] == "ds2.xlarge"
resp_auto_snap = client.describe_cluster_snapshots(
ClusterIdentifier=cluster_identifier
)
snapshot_identifier = resp_auto_snap["Snapshots"][0]["SnapshotIdentifier"]
# Delete automated snapshot should result in error
- client.delete_cluster_snapshot.when.called_with(
- SnapshotIdentifier=snapshot_identifier
- ).should.throw(
- ClientError,
- f"Cannot delete the snapshot {snapshot_identifier} because only manual snapshots may be deleted",
+ with pytest.raises(ClientError) as client_err:
+ client.delete_cluster_snapshot(SnapshotIdentifier=snapshot_identifier)
+ assert client_err.value.response["Error"]["Message"] == (
+ f"Cannot delete the snapshot {snapshot_identifier} because only "
+ "manual snapshots may be deleted"
)
@@ -867,18 +863,18 @@ def test_presence_automated_snapshot_on_cluster_delete():
)
# Ensure automated snapshot is available
resp = client.describe_cluster_snapshots(ClusterIdentifier=cluster_identifier)
- resp["Snapshots"].should.have.length_of(1)
+ assert len(resp["Snapshots"]) == 1
# Delete the cluster
cluster_response = client.delete_cluster(
ClusterIdentifier=cluster_identifier, SkipFinalClusterSnapshot=True
)
cluster = cluster_response["Cluster"]
- cluster["ClusterIdentifier"].should.equal(cluster_identifier)
+ assert cluster["ClusterIdentifier"] == cluster_identifier
# Ensure Automated snapshot is deleted
resp = client.describe_cluster_snapshots(ClusterIdentifier=cluster_identifier)
- resp["Snapshots"].should.have.length_of(0)
+ assert len(resp["Snapshots"]) == 0
@mock_redshift
@@ -896,7 +892,7 @@ def test_describe_snapshot_with_filter():
MasterUserPassword="password",
EnhancedVpcRouting=True,
)
- cluster_response["Cluster"]["NodeType"].should.equal("ds2.xlarge")
+ assert cluster_response["Cluster"]["NodeType"] == "ds2.xlarge"
resp_auto_snap = client.describe_cluster_snapshots(
ClusterIdentifier=cluster_identifier, SnapshotType="automated"
)
@@ -908,30 +904,38 @@ def test_describe_snapshot_with_filter():
resp = client.describe_cluster_snapshots(
ClusterIdentifier=cluster_identifier, SnapshotType="automated"
)
- resp["Snapshots"].should.have.length_of(1)
+ assert len(resp["Snapshots"]) == 1
resp = client.describe_cluster_snapshots(
ClusterIdentifier=cluster_identifier, SnapshotType="manual"
)
- resp["Snapshots"].should.have.length_of(1)
+ assert len(resp["Snapshots"]) == 1
resp = client.describe_cluster_snapshots(
SnapshotIdentifier=snapshot_identifier, SnapshotType="manual"
)
- resp["Snapshots"].should.have.length_of(1)
+ assert len(resp["Snapshots"]) == 1
resp = client.describe_cluster_snapshots(
SnapshotIdentifier=auto_snapshot_identifier, SnapshotType="automated"
)
- resp["Snapshots"].should.have.length_of(1)
+ assert len(resp["Snapshots"]) == 1
- client.describe_cluster_snapshots.when.called_with(
- SnapshotIdentifier=snapshot_identifier, SnapshotType="automated"
- ).should.throw(ClientError, f"Snapshot {snapshot_identifier} not found.")
+ with pytest.raises(ClientError) as client_err:
+ client.describe_cluster_snapshots(
+ SnapshotIdentifier=snapshot_identifier, SnapshotType="automated"
+ )
+ assert client_err.value.response["Error"]["Message"] == (
+ f"Snapshot {snapshot_identifier} not found."
+ )
- client.describe_cluster_snapshots.when.called_with(
- SnapshotIdentifier=auto_snapshot_identifier, SnapshotType="manual"
- ).should.throw(ClientError, f"Snapshot {auto_snapshot_identifier} not found.")
+ with pytest.raises(ClientError) as client_err:
+ client.describe_cluster_snapshots(
+ SnapshotIdentifier=auto_snapshot_identifier, SnapshotType="manual"
+ )
+ assert client_err.value.response["Error"]["Message"] == (
+ f"Snapshot {auto_snapshot_identifier} not found."
+ )
@mock_redshift
@@ -953,30 +957,32 @@ def test_create_cluster_from_automated_snapshot():
ClusterIdentifier=original_cluster_identifier, SnapshotType="automated"
)
auto_snapshot_identifier = resp_auto_snap["Snapshots"][0]["SnapshotIdentifier"]
- client.restore_from_cluster_snapshot.when.called_with(
- ClusterIdentifier=original_cluster_identifier,
- SnapshotIdentifier=auto_snapshot_identifier,
- ).should.throw(ClientError, "ClusterAlreadyExists")
+ with pytest.raises(ClientError) as client_err:
+ client.restore_from_cluster_snapshot(
+ ClusterIdentifier=original_cluster_identifier,
+ SnapshotIdentifier=auto_snapshot_identifier,
+ )
+ assert client_err.value.response["Error"]["Code"] == "ClusterAlreadyExists"
response = client.restore_from_cluster_snapshot(
ClusterIdentifier=new_cluster_identifier,
SnapshotIdentifier=auto_snapshot_identifier,
Port=1234,
)
- response["Cluster"]["ClusterStatus"].should.equal("creating")
+ assert response["Cluster"]["ClusterStatus"] == "creating"
response = client.describe_clusters(ClusterIdentifier=new_cluster_identifier)
new_cluster = response["Clusters"][0]
- new_cluster["NodeType"].should.equal("ds2.xlarge")
- new_cluster["MasterUsername"].should.equal("username")
- new_cluster["Endpoint"]["Port"].should.equal(1234)
- new_cluster["EnhancedVpcRouting"].should.equal(True)
+ assert new_cluster["NodeType"] == "ds2.xlarge"
+ assert new_cluster["MasterUsername"] == "username"
+ assert new_cluster["Endpoint"]["Port"] == 1234
+ assert new_cluster["EnhancedVpcRouting"] is True
# Make sure the new cluster has automated snapshot on cluster creation
resp_auto_snap = client.describe_cluster_snapshots(
ClusterIdentifier=new_cluster_identifier, SnapshotType="automated"
)
- resp_auto_snap["Snapshots"].should.have.length_of(1)
+ assert len(resp_auto_snap["Snapshots"]) == 1
@mock_redshift
@@ -994,7 +1000,7 @@ def test_create_cluster_snapshot():
MasterUserPassword="password",
EnhancedVpcRouting=True,
)
- cluster_response["Cluster"]["NodeType"].should.equal("ds2.xlarge")
+ assert cluster_response["Cluster"]["NodeType"] == "ds2.xlarge"
snapshot_response = client.create_cluster_snapshot(
SnapshotIdentifier=snapshot_identifier,
@@ -1002,11 +1008,11 @@ def test_create_cluster_snapshot():
Tags=[{"Key": "test-tag-key", "Value": "test-tag-value"}],
)
snapshot = snapshot_response["Snapshot"]
- snapshot["SnapshotIdentifier"].should.equal(snapshot_identifier)
- snapshot["ClusterIdentifier"].should.equal(cluster_identifier)
- snapshot["NumberOfNodes"].should.equal(1)
- snapshot["NodeType"].should.equal("ds2.xlarge")
- snapshot["MasterUsername"].should.equal("username")
+ assert snapshot["SnapshotIdentifier"] == snapshot_identifier
+ assert snapshot["ClusterIdentifier"] == cluster_identifier
+ assert snapshot["NumberOfNodes"] == 1
+ assert snapshot["NodeType"] == "ds2.xlarge"
+ assert snapshot["MasterUsername"] == "username"
@mock_redshift
@@ -1036,27 +1042,27 @@ def test_describe_cluster_snapshots():
SnapshotIdentifier=snapshot_identifier_1
)
snapshot_1 = resp_snap_1["Snapshots"][0]
- snapshot_1["SnapshotIdentifier"].should.equal(snapshot_identifier_1)
- snapshot_1["ClusterIdentifier"].should.equal(cluster_identifier)
- snapshot_1["NumberOfNodes"].should.equal(1)
- snapshot_1["NodeType"].should.equal("ds2.xlarge")
- snapshot_1["MasterUsername"].should.equal("username")
+ assert snapshot_1["SnapshotIdentifier"] == snapshot_identifier_1
+ assert snapshot_1["ClusterIdentifier"] == cluster_identifier
+ assert snapshot_1["NumberOfNodes"] == 1
+ assert snapshot_1["NodeType"] == "ds2.xlarge"
+ assert snapshot_1["MasterUsername"] == "username"
resp_snap_2 = client.describe_cluster_snapshots(
SnapshotIdentifier=snapshot_identifier_2
)
snapshot_2 = resp_snap_2["Snapshots"][0]
- snapshot_2["SnapshotIdentifier"].should.equal(snapshot_identifier_2)
- snapshot_2["ClusterIdentifier"].should.equal(cluster_identifier)
- snapshot_2["NumberOfNodes"].should.equal(1)
- snapshot_2["NodeType"].should.equal("ds2.xlarge")
- snapshot_2["MasterUsername"].should.equal("username")
+ assert snapshot_2["SnapshotIdentifier"] == snapshot_identifier_2
+ assert snapshot_2["ClusterIdentifier"] == cluster_identifier
+ assert snapshot_2["NumberOfNodes"] == 1
+ assert snapshot_2["NodeType"] == "ds2.xlarge"
+ assert snapshot_2["MasterUsername"] == "username"
resp_clust = client.describe_cluster_snapshots(
ClusterIdentifier=cluster_identifier, SnapshotType="manual"
)
- resp_clust["Snapshots"][0].should.equal(resp_snap_1["Snapshots"][0])
- resp_clust["Snapshots"][1].should.equal(resp_snap_2["Snapshots"][0])
+ assert resp_clust["Snapshots"][0] == resp_snap_1["Snapshots"][0]
+ assert resp_clust["Snapshots"][1] == resp_snap_2["Snapshots"][0]
@mock_redshift
@@ -1066,11 +1072,13 @@ def test_describe_cluster_snapshots_not_found_error():
snapshot_identifier = "non-existent-snapshot-id"
resp = client.describe_cluster_snapshots(ClusterIdentifier=cluster_identifier)
- resp["Snapshots"].should.have.length_of(0)
+ assert len(resp["Snapshots"]) == 0
- client.describe_cluster_snapshots.when.called_with(
- SnapshotIdentifier=snapshot_identifier
- ).should.throw(ClientError, f"Snapshot {snapshot_identifier} not found.")
+ with pytest.raises(ClientError) as client_err:
+ client.describe_cluster_snapshots(SnapshotIdentifier=snapshot_identifier)
+ assert client_err.value.response["Error"]["Message"] == (
+ f"Snapshot {snapshot_identifier} not found."
+ )
@mock_redshift
@@ -1091,19 +1099,24 @@ def test_delete_cluster_snapshot():
)
snapshots = client.describe_cluster_snapshots()["Snapshots"]
- list(snapshots).should.have.length_of(2)
+ assert len(list(snapshots)) == 2
- client.delete_cluster_snapshot(SnapshotIdentifier=snapshot_identifier)["Snapshot"][
- "Status"
- ].should.equal("deleted")
+ assert (
+ client.delete_cluster_snapshot(SnapshotIdentifier=snapshot_identifier)[
+ "Snapshot"
+ ]["Status"]
+ == "deleted"
+ )
snapshots = client.describe_cluster_snapshots()["Snapshots"]
- list(snapshots).should.have.length_of(1)
+ assert len(list(snapshots)) == 1
# Delete invalid id
- client.delete_cluster_snapshot.when.called_with(
- SnapshotIdentifier="non-existent"
- ).should.throw(ClientError, "Snapshot non-existent not found.")
+ with pytest.raises(ClientError) as client_err:
+ client.delete_cluster_snapshot(SnapshotIdentifier="non-existent")
+ assert client_err.value.response["Error"]["Message"] == (
+ "Snapshot non-existent not found."
+ )
@mock_redshift
@@ -1125,9 +1138,14 @@ def test_cluster_snapshot_already_exists():
SnapshotIdentifier=snapshot_identifier, ClusterIdentifier=cluster_identifier
)
- client.create_cluster_snapshot.when.called_with(
- SnapshotIdentifier=snapshot_identifier, ClusterIdentifier=cluster_identifier
- ).should.throw(ClientError, f"{snapshot_identifier} already exists")
+ with pytest.raises(ClientError) as client_err:
+ client.create_cluster_snapshot(
+ SnapshotIdentifier=snapshot_identifier, ClusterIdentifier=cluster_identifier
+ )
+ assert (
+ f"{snapshot_identifier} already exists"
+ in client_err.value.response["Error"]["Message"]
+ )
@mock_redshift
@@ -1151,24 +1169,26 @@ def test_create_cluster_from_snapshot():
ClusterIdentifier=original_cluster_identifier,
)
- client.restore_from_cluster_snapshot.when.called_with(
- ClusterIdentifier=original_cluster_identifier,
- SnapshotIdentifier=original_snapshot_identifier,
- ).should.throw(ClientError, "ClusterAlreadyExists")
+ with pytest.raises(ClientError) as client_err:
+ client.restore_from_cluster_snapshot(
+ ClusterIdentifier=original_cluster_identifier,
+ SnapshotIdentifier=original_snapshot_identifier,
+ )
+ assert client_err.value.response["Error"]["Code"] == "ClusterAlreadyExists"
response = client.restore_from_cluster_snapshot(
ClusterIdentifier=new_cluster_identifier,
SnapshotIdentifier=original_snapshot_identifier,
Port=1234,
)
- response["Cluster"]["ClusterStatus"].should.equal("creating")
+ assert response["Cluster"]["ClusterStatus"] == "creating"
response = client.describe_clusters(ClusterIdentifier=new_cluster_identifier)
new_cluster = response["Clusters"][0]
- new_cluster["NodeType"].should.equal("ds2.xlarge")
- new_cluster["MasterUsername"].should.equal("username")
- new_cluster["Endpoint"]["Port"].should.equal(1234)
- new_cluster["EnhancedVpcRouting"].should.equal(True)
+ assert new_cluster["NodeType"] == "ds2.xlarge"
+ assert new_cluster["MasterUsername"] == "username"
+ assert new_cluster["Endpoint"]["Port"] == 1234
+ assert new_cluster["EnhancedVpcRouting"] is True
@mock_redshift
@@ -1193,10 +1213,12 @@ def test_create_cluster_with_node_type_from_snapshot():
ClusterIdentifier=original_cluster_identifier,
)
- client.restore_from_cluster_snapshot.when.called_with(
- ClusterIdentifier=original_cluster_identifier,
- SnapshotIdentifier=original_snapshot_identifier,
- ).should.throw(ClientError, "ClusterAlreadyExists")
+ with pytest.raises(ClientError) as client_err:
+ client.restore_from_cluster_snapshot(
+ ClusterIdentifier=original_cluster_identifier,
+ SnapshotIdentifier=original_snapshot_identifier,
+ )
+ assert client_err.value.response["Error"]["Code"] == "ClusterAlreadyExists"
response = client.restore_from_cluster_snapshot(
ClusterIdentifier=new_cluster_identifier,
@@ -1204,14 +1226,14 @@ def test_create_cluster_with_node_type_from_snapshot():
NodeType="ra3.xlplus",
NumberOfNodes=3,
)
- response["Cluster"]["ClusterStatus"].should.equal("creating")
+ assert response["Cluster"]["ClusterStatus"] == "creating"
response = client.describe_clusters(ClusterIdentifier=new_cluster_identifier)
new_cluster = response["Clusters"][0]
- new_cluster["NodeType"].should.equal("ra3.xlplus")
- new_cluster["NumberOfNodes"].should.equal(3)
- new_cluster["MasterUsername"].should.equal("username")
- new_cluster["EnhancedVpcRouting"].should.equal(True)
+ assert new_cluster["NodeType"] == "ra3.xlplus"
+ assert new_cluster["NumberOfNodes"] == 3
+ assert new_cluster["MasterUsername"] == "username"
+ assert new_cluster["EnhancedVpcRouting"] is True
@mock_redshift
@@ -1238,7 +1260,7 @@ def test_create_cluster_from_snapshot_with_waiter():
SnapshotIdentifier=original_snapshot_identifier,
Port=1234,
)
- response["Cluster"]["ClusterStatus"].should.equal("creating")
+ assert response["Cluster"]["ClusterStatus"] == "creating"
client.get_waiter("cluster_restored").wait(
ClusterIdentifier=new_cluster_identifier,
@@ -1247,18 +1269,22 @@ def test_create_cluster_from_snapshot_with_waiter():
response = client.describe_clusters(ClusterIdentifier=new_cluster_identifier)
new_cluster = response["Clusters"][0]
- new_cluster["NodeType"].should.equal("ds2.xlarge")
- new_cluster["MasterUsername"].should.equal("username")
- new_cluster["EnhancedVpcRouting"].should.equal(True)
- new_cluster["Endpoint"]["Port"].should.equal(1234)
+ assert new_cluster["NodeType"] == "ds2.xlarge"
+ assert new_cluster["MasterUsername"] == "username"
+ assert new_cluster["EnhancedVpcRouting"] is True
+ assert new_cluster["Endpoint"]["Port"] == 1234
@mock_redshift
def test_create_cluster_from_non_existent_snapshot():
client = boto3.client("redshift", region_name="us-east-1")
- client.restore_from_cluster_snapshot.when.called_with(
- ClusterIdentifier="cluster-id", SnapshotIdentifier="non-existent-snapshot"
- ).should.throw(ClientError, "Snapshot non-existent-snapshot not found.")
+ with pytest.raises(ClientError) as client_err:
+ client.restore_from_cluster_snapshot(
+ ClusterIdentifier="cluster-id", SnapshotIdentifier="non-existent-snapshot"
+ )
+ assert client_err.value.response["Error"]["Message"] == (
+ "Snapshot non-existent-snapshot not found."
+ )
@mock_redshift
@@ -1273,10 +1299,10 @@ def test_create_cluster_status_update():
MasterUsername="username",
MasterUserPassword="password",
)
- response["Cluster"]["ClusterStatus"].should.equal("creating")
+ assert response["Cluster"]["ClusterStatus"] == "creating"
response = client.describe_clusters(ClusterIdentifier=cluster_identifier)
- response["Clusters"][0]["ClusterStatus"].should.equal("available")
+ assert response["Clusters"][0]["ClusterStatus"] == "available"
@mock_redshift
@@ -1287,7 +1313,10 @@ def test_describe_tags_with_resource_type():
f"arn:aws:redshift:us-east-1:{ACCOUNT_ID}:cluster:{cluster_identifier}"
)
snapshot_identifier = "my_snapshot"
- snapshot_arn = f"arn:aws:redshift:us-east-1:{ACCOUNT_ID}:snapshot:{cluster_identifier}/{snapshot_identifier}"
+ snapshot_arn = (
+ f"arn:aws:redshift:us-east-1:{ACCOUNT_ID}:snapshot"
+ f":{cluster_identifier}/{snapshot_identifier}"
+ )
tag_key = "test-tag-key"
tag_value = "test-tag-value"
@@ -1302,12 +1331,12 @@ def test_describe_tags_with_resource_type():
)
tags_response = client.describe_tags(ResourceType="cluster")
tagged_resources = tags_response["TaggedResources"]
- list(tagged_resources).should.have.length_of(1)
- tagged_resources[0]["ResourceType"].should.equal("cluster")
- tagged_resources[0]["ResourceName"].should.equal(cluster_arn)
+ assert len(list(tagged_resources)) == 1
+ assert tagged_resources[0]["ResourceType"] == "cluster"
+ assert tagged_resources[0]["ResourceName"] == cluster_arn
tag = tagged_resources[0]["Tag"]
- tag["Key"].should.equal(tag_key)
- tag["Value"].should.equal(tag_value)
+ assert tag["Key"] == tag_key
+ assert tag["Value"] == tag_value
client.create_cluster_snapshot(
SnapshotIdentifier=snapshot_identifier,
@@ -1316,12 +1345,12 @@ def test_describe_tags_with_resource_type():
)
tags_response = client.describe_tags(ResourceType="snapshot")
tagged_resources = tags_response["TaggedResources"]
- list(tagged_resources).should.have.length_of(1)
- tagged_resources[0]["ResourceType"].should.equal("snapshot")
- tagged_resources[0]["ResourceName"].should.equal(snapshot_arn)
+ assert len(list(tagged_resources)) == 1
+ assert tagged_resources[0]["ResourceType"] == "snapshot"
+ assert tagged_resources[0]["ResourceName"] == snapshot_arn
tag = tagged_resources[0]["Tag"]
- tag["Key"].should.equal(tag_key)
- tag["Value"].should.equal(tag_value)
+ assert tag["Key"] == tag_key
+ assert tag["Value"] == tag_value
@mock_redshift
@@ -1329,9 +1358,12 @@ def test_describe_tags_cannot_specify_resource_type_and_resource_name():
client = boto3.client("redshift", region_name="us-east-1")
resource_name = f"arn:aws:redshift:us-east-1:{ACCOUNT_ID}:cluster:cluster-id"
resource_type = "cluster"
- client.describe_tags.when.called_with(
- ResourceName=resource_name, ResourceType=resource_type
- ).should.throw(ClientError, "using either an ARN or a resource type")
+ with pytest.raises(ClientError) as client_err:
+ client.describe_tags(ResourceName=resource_name, ResourceType=resource_type)
+ assert (
+ "using either an ARN or a resource type"
+ in client_err.value.response["Error"]["Message"]
+ )
@mock_redshift
@@ -1342,7 +1374,10 @@ def test_describe_tags_with_resource_name():
f"arn:aws:redshift:us-east-1:{ACCOUNT_ID}:cluster:{cluster_identifier}"
)
snapshot_identifier = "snapshot-id"
- snapshot_arn = f"arn:aws:redshift:us-east-1:{ACCOUNT_ID}:snapshot:{cluster_identifier}/{snapshot_identifier}"
+ snapshot_arn = (
+ f"arn:aws:redshift:us-east-1:{ACCOUNT_ID}:snapshot"
+ f":{cluster_identifier}/{snapshot_identifier}"
+ )
tag_key = "test-tag-key"
tag_value = "test-tag-value"
@@ -1357,12 +1392,12 @@ def test_describe_tags_with_resource_name():
)
tags_response = client.describe_tags(ResourceName=cluster_arn)
tagged_resources = tags_response["TaggedResources"]
- list(tagged_resources).should.have.length_of(1)
- tagged_resources[0]["ResourceType"].should.equal("cluster")
- tagged_resources[0]["ResourceName"].should.equal(cluster_arn)
+ assert len(list(tagged_resources)) == 1
+ assert tagged_resources[0]["ResourceType"] == "cluster"
+ assert tagged_resources[0]["ResourceName"] == cluster_arn
tag = tagged_resources[0]["Tag"]
- tag["Key"].should.equal(tag_key)
- tag["Value"].should.equal(tag_value)
+ assert tag["Key"] == tag_key
+ assert tag["Value"] == tag_value
client.create_cluster_snapshot(
SnapshotIdentifier=snapshot_identifier,
@@ -1371,12 +1406,12 @@ def test_describe_tags_with_resource_name():
)
tags_response = client.describe_tags(ResourceName=snapshot_arn)
tagged_resources = tags_response["TaggedResources"]
- list(tagged_resources).should.have.length_of(1)
- tagged_resources[0]["ResourceType"].should.equal("snapshot")
- tagged_resources[0]["ResourceName"].should.equal(snapshot_arn)
+ assert len(list(tagged_resources)) == 1
+ assert tagged_resources[0]["ResourceType"] == "snapshot"
+ assert tagged_resources[0]["ResourceName"] == snapshot_arn
tag = tagged_resources[0]["Tag"]
- tag["Key"].should.equal(tag_key)
- tag["Value"].should.equal(tag_value)
+ assert tag["Key"] == tag_key
+ assert tag["Value"] == tag_value
@mock_redshift
@@ -1405,9 +1440,9 @@ def test_create_tags():
client.create_tags(ResourceName=cluster_arn, Tags=tags)
response = client.describe_clusters(ClusterIdentifier=cluster_identifier)
cluster = response["Clusters"][0]
- list(cluster["Tags"]).should.have.length_of(num_tags)
+ assert len(list(cluster["Tags"])) == num_tags
response = client.describe_tags(ResourceName=cluster_arn)
- list(response["TaggedResources"]).should.have.length_of(num_tags)
+ assert len(list(response["TaggedResources"])) == num_tags
@mock_redshift
@@ -1439,9 +1474,9 @@ def test_delete_tags():
)
response = client.describe_clusters(ClusterIdentifier=cluster_identifier)
cluster = response["Clusters"][0]
- list(cluster["Tags"]).should.have.length_of(1)
+ assert len(list(cluster["Tags"])) == 1
response = client.describe_tags(ResourceName=cluster_arn)
- list(response["TaggedResources"]).should.have.length_of(1)
+ assert len(list(response["TaggedResources"])) == 1
@mock_ec2
@@ -1452,7 +1487,7 @@ def test_describe_tags_all_resource_types():
subnet = ec2.create_subnet(VpcId=vpc.id, CidrBlock="10.0.0.0/24")
client = boto3.client("redshift", region_name="us-east-1")
response = client.describe_tags()
- list(response["TaggedResources"]).should.have.length_of(0)
+ assert len(list(response["TaggedResources"])) == 0
client.create_cluster_subnet_group(
ClusterSubnetGroupName="my_subnet_group",
Description="This is my subnet group",
@@ -1494,8 +1529,8 @@ def test_describe_tags_all_resource_types():
]
tagged_resources = response["TaggedResources"]
returned_types = [resource["ResourceType"] for resource in tagged_resources]
- list(tagged_resources).should.have.length_of(len(expected_types))
- set(returned_types).should.equal(set(expected_types))
+ assert len(list(tagged_resources)) == len(expected_types)
+ assert set(returned_types) == set(expected_types)
@mock_redshift
@@ -1503,21 +1538,28 @@ def test_tagged_resource_not_found_error():
client = boto3.client("redshift", region_name="us-east-1")
cluster_arn = "arn:aws:redshift:us-east-1::cluster:fake"
- client.describe_tags.when.called_with(ResourceName=cluster_arn).should.throw(
- ClientError, "cluster (fake) not found."
- )
+ with pytest.raises(ClientError) as client_err:
+ client.describe_tags(ResourceName=cluster_arn)
+ assert client_err.value.response["Error"]["Message"] == "cluster (fake) not found."
snapshot_arn = "arn:aws:redshift:us-east-1::snapshot:cluster-id/snap-id"
- client.delete_tags.when.called_with(
- ResourceName=snapshot_arn, TagKeys=["test"]
- ).should.throw(ClientError, "snapshot (snap-id) not found.")
-
- client.describe_tags.when.called_with(ResourceType="cluster").should.throw(
- ClientError, "resource of type 'cluster' not found."
+ with pytest.raises(ClientError) as client_err:
+ client.delete_tags(ResourceName=snapshot_arn, TagKeys=["test"])
+ assert (
+ client_err.value.response["Error"]["Message"] == "snapshot (snap-id) not found."
)
- client.describe_tags.when.called_with(ResourceName="bad:arn").should.throw(
- ClientError, "Tagging is not supported for this type of resource"
+ with pytest.raises(ClientError) as client_err:
+ client.describe_tags(ResourceType="cluster")
+ assert client_err.value.response["Error"]["Message"] == (
+ "resource of type 'cluster' not found."
+ )
+
+ with pytest.raises(ClientError) as client_err:
+ client.describe_tags(ResourceName="bad:arn")
+ assert (
+ "Tagging is not supported for this type of resource"
+ in client_err.value.response["Error"]["Message"]
)
@@ -1537,10 +1579,10 @@ def test_enable_snapshot_copy():
client.enable_snapshot_copy(
ClusterIdentifier="test", DestinationRegion="us-west-2", RetentionPeriod=3
)
- ex.value.response["Error"]["Code"].should.equal("InvalidParameterValue")
- ex.value.response["Error"]["Message"].should.contain(
+ assert ex.value.response["Error"]["Code"] == "InvalidParameterValue"
+ assert (
"SnapshotCopyGrantName is required for Snapshot Copy on KMS encrypted clusters."
- )
+ ) in ex.value.response["Error"]["Message"]
with pytest.raises(ClientError) as ex:
client.enable_snapshot_copy(
ClusterIdentifier="test",
@@ -1548,8 +1590,8 @@ def test_enable_snapshot_copy():
RetentionPeriod=3,
SnapshotCopyGrantName="invalid-us-east-1-to-us-east-1",
)
- ex.value.response["Error"]["Code"].should.equal("UnknownSnapshotCopyRegionFault")
- ex.value.response["Error"]["Message"].should.contain("Invalid region us-east-1")
+ assert ex.value.response["Error"]["Code"] == "UnknownSnapshotCopyRegionFault"
+ assert "Invalid region us-east-1" in ex.value.response["Error"]["Message"]
client.enable_snapshot_copy(
ClusterIdentifier="test",
DestinationRegion="us-west-2",
@@ -1558,9 +1600,9 @@ def test_enable_snapshot_copy():
)
response = client.describe_clusters(ClusterIdentifier="test")
cluster_snapshot_copy_status = response["Clusters"][0]["ClusterSnapshotCopyStatus"]
- cluster_snapshot_copy_status["RetentionPeriod"].should.equal(3)
- cluster_snapshot_copy_status["DestinationRegion"].should.equal("us-west-2")
- cluster_snapshot_copy_status["SnapshotCopyGrantName"].should.equal(
+ assert cluster_snapshot_copy_status["RetentionPeriod"] == 3
+ assert cluster_snapshot_copy_status["DestinationRegion"] == "us-west-2"
+ assert cluster_snapshot_copy_status["SnapshotCopyGrantName"] == (
"copy-us-east-1-to-us-west-2"
)
@@ -1579,8 +1621,8 @@ def test_enable_snapshot_copy_unencrypted():
client.enable_snapshot_copy(ClusterIdentifier="test", DestinationRegion="us-west-2")
response = client.describe_clusters(ClusterIdentifier="test")
cluster_snapshot_copy_status = response["Clusters"][0]["ClusterSnapshotCopyStatus"]
- cluster_snapshot_copy_status["RetentionPeriod"].should.equal(7)
- cluster_snapshot_copy_status["DestinationRegion"].should.equal("us-west-2")
+ assert cluster_snapshot_copy_status["RetentionPeriod"] == 7
+ assert cluster_snapshot_copy_status["DestinationRegion"] == "us-west-2"
@mock_redshift
@@ -1602,7 +1644,7 @@ def test_disable_snapshot_copy():
)
client.disable_snapshot_copy(ClusterIdentifier="test")
response = client.describe_clusters(ClusterIdentifier="test")
- response["Clusters"][0].shouldnt.contain("ClusterSnapshotCopyStatus")
+ assert "ClusterSnapshotCopyStatus" not in response["Clusters"][0]
@mock_redshift
@@ -1627,7 +1669,7 @@ def test_modify_snapshot_copy_retention_period():
)
response = client.describe_clusters(ClusterIdentifier="test")
cluster_snapshot_copy_status = response["Clusters"][0]["ClusterSnapshotCopyStatus"]
- cluster_snapshot_copy_status["RetentionPeriod"].should.equal(5)
+ assert cluster_snapshot_copy_status["RetentionPeriod"] == 5
@mock_redshift
@@ -1642,9 +1684,9 @@ def test_create_duplicate_cluster_fails():
}
client = boto3.client("redshift", region_name="us-east-1")
client.create_cluster(**kwargs)
- client.create_cluster.when.called_with(**kwargs).should.throw(
- ClientError, "ClusterAlreadyExists"
- )
+ with pytest.raises(ClientError) as client_err:
+ client.create_cluster(**kwargs)
+ assert client_err.value.response["Error"]["Code"] == "ClusterAlreadyExists"
@mock_redshift
@@ -1653,8 +1695,8 @@ def test_delete_cluster_with_final_snapshot():
with pytest.raises(ClientError) as ex:
client.delete_cluster(ClusterIdentifier="non-existent")
- ex.value.response["Error"]["Code"].should.equal("ClusterNotFound")
- ex.value.response["Error"]["Message"].should.match(r"Cluster .+ not found.")
+ assert ex.value.response["Error"]["Code"] == "ClusterNotFound"
+ assert re.match(r"Cluster .+ not found.", ex.value.response["Error"]["Message"])
cluster_identifier = "my_cluster"
client.create_cluster(
@@ -1670,10 +1712,10 @@ def test_delete_cluster_with_final_snapshot():
client.delete_cluster(
ClusterIdentifier=cluster_identifier, SkipFinalClusterSnapshot=False
)
- ex.value.response["Error"]["Code"].should.equal("InvalidParameterCombination")
- ex.value.response["Error"]["Message"].should.contain(
+ assert ex.value.response["Error"]["Code"] == "InvalidParameterCombination"
+ assert (
"FinalClusterSnapshotIdentifier is required unless SkipFinalClusterSnapshot is specified."
- )
+ ) in ex.value.response["Error"]["Message"]
snapshot_identifier = "my_snapshot"
client.delete_cluster(
@@ -1683,14 +1725,14 @@ def test_delete_cluster_with_final_snapshot():
)
resp = client.describe_cluster_snapshots(ClusterIdentifier=cluster_identifier)
- resp["Snapshots"].should.have.length_of(1)
- resp["Snapshots"][0]["SnapshotIdentifier"].should.equal(snapshot_identifier)
- resp["Snapshots"][0]["SnapshotType"].should.equal("manual")
+ assert len(resp["Snapshots"]) == 1
+ assert resp["Snapshots"][0]["SnapshotIdentifier"] == snapshot_identifier
+ assert resp["Snapshots"][0]["SnapshotType"] == "manual"
with pytest.raises(ClientError) as ex:
client.describe_clusters(ClusterIdentifier=cluster_identifier)
- ex.value.response["Error"]["Code"].should.equal("ClusterNotFound")
- ex.value.response["Error"]["Message"].should.match(r"Cluster .+ not found.")
+ assert ex.value.response["Error"]["Code"] == "ClusterNotFound"
+ assert re.match(r"Cluster .+ not found.", ex.value.response["Error"]["Message"])
@mock_redshift
@@ -1709,40 +1751,39 @@ def test_delete_cluster_without_final_snapshot():
ClusterIdentifier=cluster_identifier, SkipFinalClusterSnapshot=True
)
cluster = cluster_response["Cluster"]
- cluster["ClusterIdentifier"].should.equal(cluster_identifier)
- cluster["NodeType"].should.equal("ds2.xlarge")
+ assert cluster["ClusterIdentifier"] == cluster_identifier
+ assert cluster["NodeType"] == "ds2.xlarge"
# Bug: This is what AWS returns
- # cluster["ClusterStatus"].should.equal("deleting")
- cluster["MasterUsername"].should.equal("user")
- cluster["DBName"].should.equal("test")
+ # assert cluster["ClusterStatus"] == "deleting"
+ assert cluster["MasterUsername"] == "user"
+ assert cluster["DBName"] == "test"
endpoint = cluster["Endpoint"]
- endpoint["Address"].should.match(
- f"{cluster_identifier}.[a-z0-9]+.us-east-1.redshift.amazonaws.com"
+ assert re.match(
+ f"{cluster_identifier}.[a-z0-9]+.us-east-1.redshift.amazonaws.com",
+ endpoint["Address"],
)
- endpoint["Port"].should.equal(5439)
- cluster["AutomatedSnapshotRetentionPeriod"].should.equal(1)
- cluster["ClusterParameterGroups"].should.have.length_of(1)
+ assert endpoint["Port"] == 5439
+ assert cluster["AutomatedSnapshotRetentionPeriod"] == 1
+ assert len(cluster["ClusterParameterGroups"]) == 1
param_group = cluster["ClusterParameterGroups"][0]
- param_group.should.equal(
- {
- "ParameterGroupName": "default.redshift-1.0",
- "ParameterApplyStatus": "in-sync",
- }
- )
- cluster["AvailabilityZone"].should.equal("us-east-1a")
- cluster["ClusterVersion"].should.equal("1.0")
- cluster["AllowVersionUpgrade"].should.equal(True)
- cluster["NumberOfNodes"].should.equal(1)
- cluster["Encrypted"].should.equal(False)
- cluster["EnhancedVpcRouting"].should.equal(False)
+ assert param_group == {
+ "ParameterGroupName": "default.redshift-1.0",
+ "ParameterApplyStatus": "in-sync",
+ }
+ assert cluster["AvailabilityZone"] == "us-east-1a"
+ assert cluster["ClusterVersion"] == "1.0"
+ assert cluster["AllowVersionUpgrade"] is True
+ assert cluster["NumberOfNodes"] == 1
+ assert cluster["Encrypted"] is False
+ assert cluster["EnhancedVpcRouting"] is False
resp = client.describe_cluster_snapshots(ClusterIdentifier=cluster_identifier)
- resp["Snapshots"].should.have.length_of(0)
+ assert len(resp["Snapshots"]) == 0
with pytest.raises(ClientError) as ex:
client.describe_clusters(ClusterIdentifier=cluster_identifier)
- ex.value.response["Error"]["Code"].should.equal("ClusterNotFound")
- ex.value.response["Error"]["Message"].should.match(r"Cluster .+ not found.")
+ assert ex.value.response["Error"]["Code"] == "ClusterNotFound"
+ assert re.match(r"Cluster .+ not found.", ex.value.response["Error"]["Message"])
@mock_redshift
@@ -1756,26 +1797,26 @@ def test_resize_cluster():
MasterUsername="user",
MasterUserPassword="password",
)
- resp["Cluster"]["NumberOfNodes"].should.equal(1)
+ assert resp["Cluster"]["NumberOfNodes"] == 1
client.modify_cluster(
ClusterIdentifier="test", ClusterType="multi-node", NumberOfNodes=2
)
resp = client.describe_clusters(ClusterIdentifier="test")
- resp["Clusters"][0]["NumberOfNodes"].should.equal(2)
+ assert resp["Clusters"][0]["NumberOfNodes"] == 2
client.modify_cluster(ClusterIdentifier="test", ClusterType="single-node")
resp = client.describe_clusters(ClusterIdentifier="test")
- resp["Clusters"][0]["NumberOfNodes"].should.equal(1)
+ assert resp["Clusters"][0]["NumberOfNodes"] == 1
with pytest.raises(ClientError) as ex:
client.modify_cluster(
ClusterIdentifier="test", ClusterType="multi-node", NumberOfNodes=1
)
- ex.value.response["Error"]["Code"].should.equal("InvalidParameterCombination")
- ex.value.response["Error"]["Message"].should.contain(
+ assert ex.value.response["Error"]["Code"] == "InvalidParameterCombination"
+ assert (
"Number of nodes for cluster type multi-node must be greater than or equal to 2"
- )
+ ) in ex.value.response["Error"]["Message"]
with pytest.raises(ClientError) as ex:
client.modify_cluster(
@@ -1783,8 +1824,8 @@ def test_resize_cluster():
ClusterType="invalid-cluster-type",
NumberOfNodes=1,
)
- ex.value.response["Error"]["Code"].should.equal("InvalidParameterValue")
- ex.value.response["Error"]["Message"].should.contain("Invalid cluster type")
+ assert ex.value.response["Error"]["Code"] == "InvalidParameterValue"
+ assert "Invalid cluster type" in ex.value.response["Error"]["Message"]
@mock_redshift
@@ -1795,8 +1836,8 @@ def test_get_cluster_credentials_non_existent_cluster_and_user():
client.get_cluster_credentials(
ClusterIdentifier="non-existent", DbUser="some_user"
)
- ex.value.response["Error"]["Code"].should.equal("ClusterNotFound")
- ex.value.response["Error"]["Message"].should.match(r"Cluster .+ not found.")
+ assert ex.value.response["Error"]["Code"] == "ClusterNotFound"
+ assert re.match(r"Cluster .+ not found.", ex.value.response["Error"]["Message"])
@mock_redshift
@@ -1818,18 +1859,20 @@ def test_get_cluster_credentials_invalid_duration():
client.get_cluster_credentials(
ClusterIdentifier=cluster_identifier, DbUser=db_user, DurationSeconds=899
)
- ex.value.response["Error"]["Code"].should.equal("InvalidParameterValue")
- ex.value.response["Error"]["Message"].should.contain(
+ assert ex.value.response["Error"]["Code"] == "InvalidParameterValue"
+ assert (
"Token duration must be between 900 and 3600 seconds"
+ in ex.value.response["Error"]["Message"]
)
with pytest.raises(ClientError) as ex:
client.get_cluster_credentials(
ClusterIdentifier=cluster_identifier, DbUser=db_user, DurationSeconds=3601
)
- ex.value.response["Error"]["Code"].should.equal("InvalidParameterValue")
- ex.value.response["Error"]["Message"].should.contain(
+ assert ex.value.response["Error"]["Code"] == "InvalidParameterValue"
+ assert (
"Token duration must be between 900 and 3600 seconds"
+ in ex.value.response["Error"]["Message"]
)
@@ -1848,30 +1891,30 @@ def test_get_cluster_credentials():
)
expected_expiration = time.mktime(
- (datetime.datetime.now() + datetime.timedelta(0, 900)).timetuple()
+ (datetime.datetime.now(tzutc()) + datetime.timedelta(0, 900)).timetuple()
)
db_user = "some_user"
response = client.get_cluster_credentials(
ClusterIdentifier=cluster_identifier, DbUser=db_user
)
- response["DbUser"].should.equal(f"IAM:{db_user}")
+ assert response["DbUser"] == f"IAM:{db_user}"
assert time.mktime((response["Expiration"]).timetuple()) == pytest.approx(
expected_expiration
)
- response["DbPassword"].should.have.length_of(32)
+ assert len(response["DbPassword"]) == 32
response = client.get_cluster_credentials(
ClusterIdentifier=cluster_identifier, DbUser=db_user, AutoCreate=True
)
- response["DbUser"].should.equal(f"IAMA:{db_user}")
+ assert response["DbUser"] == f"IAMA:{db_user}"
response = client.get_cluster_credentials(
ClusterIdentifier=cluster_identifier, DbUser="some_other_user", AutoCreate=False
)
- response["DbUser"].should.equal("IAM:some_other_user")
+ assert response["DbUser"] == "IAM:some_other_user"
expected_expiration = time.mktime(
- (datetime.datetime.now() + datetime.timedelta(0, 3000)).timetuple()
+ (datetime.datetime.now(tzutc()) + datetime.timedelta(0, 3000)).timetuple()
)
response = client.get_cluster_credentials(
ClusterIdentifier=cluster_identifier, DbUser=db_user, DurationSeconds=3000
@@ -1893,17 +1936,17 @@ def test_pause_cluster():
MasterUserPassword="password",
)
cluster = response["Cluster"]
- cluster["ClusterIdentifier"].should.equal("test")
+ assert cluster["ClusterIdentifier"] == "test"
response = client.pause_cluster(ClusterIdentifier="test")
cluster = response["Cluster"]
- cluster["ClusterIdentifier"].should.equal("test")
+ assert cluster["ClusterIdentifier"] == "test"
# Verify this call returns all properties
- cluster["NodeType"].should.equal("ds2.xlarge")
- cluster["ClusterStatus"].should.equal("paused")
- cluster["ClusterVersion"].should.equal("1.0")
- cluster["AllowVersionUpgrade"].should.equal(True)
- cluster["Endpoint"]["Port"].should.equal(5439)
+ assert cluster["NodeType"] == "ds2.xlarge"
+ assert cluster["ClusterStatus"] == "paused"
+ assert cluster["ClusterVersion"] == "1.0"
+ assert cluster["AllowVersionUpgrade"] is True
+ assert cluster["Endpoint"]["Port"] == 5439
@mock_redshift
@@ -1913,8 +1956,8 @@ def test_pause_unknown_cluster():
with pytest.raises(ClientError) as exc:
client.pause_cluster(ClusterIdentifier="test")
err = exc.value.response["Error"]
- err["Code"].should.equal("ClusterNotFound")
- err["Message"].should.equal("Cluster test not found.")
+ assert err["Code"] == "ClusterNotFound"
+ assert err["Message"] == "Cluster test not found."
@mock_redshift
@@ -1932,13 +1975,13 @@ def test_resume_cluster():
client.pause_cluster(ClusterIdentifier="test")
response = client.resume_cluster(ClusterIdentifier="test")
cluster = response["Cluster"]
- cluster["ClusterIdentifier"].should.equal("test")
+ assert cluster["ClusterIdentifier"] == "test"
# Verify this call returns all properties
- cluster["NodeType"].should.equal("ds2.xlarge")
- cluster["ClusterStatus"].should.equal("available")
- cluster["ClusterVersion"].should.equal("1.0")
- cluster["AllowVersionUpgrade"].should.equal(True)
- cluster["Endpoint"]["Port"].should.equal(5439)
+ assert cluster["NodeType"] == "ds2.xlarge"
+ assert cluster["ClusterStatus"] == "available"
+ assert cluster["ClusterVersion"] == "1.0"
+ assert cluster["AllowVersionUpgrade"] is True
+ assert cluster["Endpoint"]["Port"] == 5439
@mock_redshift
@@ -1948,5 +1991,5 @@ def test_resume_unknown_cluster():
with pytest.raises(ClientError) as exc:
client.resume_cluster(ClusterIdentifier="test")
err = exc.value.response["Error"]
- err["Code"].should.equal("ClusterNotFound")
- err["Message"].should.equal("Cluster test not found.")
+ assert err["Code"] == "ClusterNotFound"
+ assert err["Message"] == "Cluster test not found."
diff --git a/tests/test_redshift/test_redshift_cloudformation.py b/tests/test_redshift/test_redshift_cloudformation.py
index fdad3093a..837ad9092 100644
--- a/tests/test_redshift/test_redshift_cloudformation.py
+++ b/tests/test_redshift/test_redshift_cloudformation.py
@@ -1,6 +1,6 @@
-import boto3
import json
-import sure # noqa # pylint: disable=unused-import
+
+import boto3
from moto import mock_cloudformation, mock_ec2, mock_redshift
from tests.test_cloudformation.fixtures import redshift
@@ -33,20 +33,20 @@ def test_redshift_stack():
cluster_res = redshift_conn.describe_clusters()
clusters = cluster_res["Clusters"]
- clusters.should.have.length_of(1)
+ assert len(clusters) == 1
cluster = clusters[0]
- cluster["DBName"].should.equal("mydb")
- cluster["NumberOfNodes"].should.equal(2)
- cluster["NodeType"].should.equal("dw1.xlarge")
- cluster["MasterUsername"].should.equal("myuser")
- cluster["Endpoint"]["Port"].should.equal(5439)
- cluster["VpcSecurityGroups"].should.have.length_of(1)
+ assert cluster["DBName"] == "mydb"
+ assert cluster["NumberOfNodes"] == 2
+ assert cluster["NodeType"] == "dw1.xlarge"
+ assert cluster["MasterUsername"] == "myuser"
+ assert cluster["Endpoint"]["Port"] == 5439
+ assert len(cluster["VpcSecurityGroups"]) == 1
security_group_id = cluster["VpcSecurityGroups"][0]["VpcSecurityGroupId"]
groups = ec2.describe_security_groups(GroupIds=[security_group_id])[
"SecurityGroups"
]
- groups.should.have.length_of(1)
+ assert len(groups) == 1
group = groups[0]
- group["IpPermissions"].should.have.length_of(1)
- group["IpPermissions"][0]["IpRanges"][0]["CidrIp"].should.equal("10.0.0.1/16")
+ assert len(group["IpPermissions"]) == 1
+ assert group["IpPermissions"][0]["IpRanges"][0]["CidrIp"] == "10.0.0.1/16"
diff --git a/tests/test_redshift/test_server.py b/tests/test_redshift/test_server.py
index 463f4e46e..f55ea3cd8 100644
--- a/tests/test_redshift/test_server.py
+++ b/tests/test_redshift/test_server.py
@@ -1,15 +1,13 @@
-import sure # noqa # pylint: disable=unused-import
+"""Test the different server responses."""
import json
+import re
+
import pytest
import xmltodict
import moto.server as server
from moto import mock_redshift
-"""
-Test the different server responses
-"""
-
@mock_redshift
def test_describe_clusters():
@@ -19,7 +17,7 @@ def test_describe_clusters():
res = test_client.get("/?Action=DescribeClusters")
result = res.data.decode("utf-8")
- result.should.contain("")
+ assert "" in result
@mock_redshift
@@ -31,9 +29,9 @@ def test_describe_clusters_with_json_content_type():
result = json.loads(res.data.decode("utf-8"))
del result["DescribeClustersResponse"]["ResponseMetadata"]
- result.should.equal(
- {"DescribeClustersResponse": {"DescribeClustersResult": {"Clusters": []}}}
- )
+ assert result == {
+ "DescribeClustersResponse": {"DescribeClustersResult": {"Clusters": []}}
+ }
@pytest.mark.parametrize("is_json", [True, False], ids=["JSON", "XML"])
@@ -61,30 +59,29 @@ def test_create_cluster(is_json):
result = xmltodict.parse(result, dict_constructor=dict)
del result["CreateClusterResponse"]["ResponseMetadata"]
- result.should.have.key("CreateClusterResponse")
- result["CreateClusterResponse"].should.have.key("CreateClusterResult")
- result["CreateClusterResponse"]["CreateClusterResult"].should.have.key("Cluster")
+ assert "CreateClusterResponse" in result
+ assert "CreateClusterResult" in result["CreateClusterResponse"]
+ assert "Cluster" in result["CreateClusterResponse"]["CreateClusterResult"]
result = result["CreateClusterResponse"]["CreateClusterResult"]["Cluster"]
- result.should.have.key("MasterUsername").equal("masteruser")
- result.should.have.key("MasterUserPassword").equal("****")
- result.should.have.key("ClusterVersion").equal("1.0")
- result.should.have.key("ClusterSubnetGroupName").equal(None)
- result.should.have.key("AvailabilityZone").equal("us-east-1a")
- result.should.have.key("ClusterStatus").equal("creating")
- result.should.have.key("NumberOfNodes").equal(1 if is_json else "1")
- result.should.have.key("PubliclyAccessible").equal(None)
- result.should.have.key("Encrypted").equal(None)
- result.should.have.key("DBName").equal("dev")
- result.should.have.key("NodeType").equal("ds2.xlarge")
- result.should.have.key("ClusterIdentifier").equal("examplecluster")
- result.should.have.key("Endpoint").should.have.key("Address").match(
- "examplecluster.[a-z0-9]+.us-east-1.redshift.amazonaws.com"
+ assert result["MasterUsername"] == "masteruser"
+ assert result["MasterUserPassword"] == "****"
+ assert result["ClusterVersion"] == "1.0"
+ assert result["ClusterSubnetGroupName"] is None
+ assert result["AvailabilityZone"] == "us-east-1a"
+ assert result["ClusterStatus"] == "creating"
+ assert result["NumberOfNodes"] == (1 if is_json else "1")
+ assert result["PubliclyAccessible"] is None
+ assert result["Encrypted"] is None
+ assert result["DBName"] == "dev"
+ assert result["NodeType"] == "ds2.xlarge"
+ assert result["ClusterIdentifier"] == "examplecluster"
+ assert re.match(
+ "examplecluster.[a-z0-9]+.us-east-1.redshift.amazonaws.com",
+ result["Endpoint"]["Address"],
)
- result.should.have.key("Endpoint").should.have.key("Port").equal(
- 5439 if is_json else "5439"
- )
- result.should.have.key("ClusterCreateTime")
+ assert result["Endpoint"]["Port"] == (5439 if is_json else "5439")
+ assert "ClusterCreateTime" in result
@pytest.mark.parametrize("is_json", [True, False], ids=["JSON", "XML"])
@@ -122,37 +119,34 @@ def test_create_cluster_multiple_params(is_json):
result = xmltodict.parse(result, dict_constructor=dict)
del result["CreateClusterResponse"]["ResponseMetadata"]
- result.should.have.key("CreateClusterResponse")
- result["CreateClusterResponse"].should.have.key("CreateClusterResult")
- result["CreateClusterResponse"]["CreateClusterResult"].should.have.key("Cluster")
+ assert "CreateClusterResponse" in result
+ assert "CreateClusterResult" in result["CreateClusterResponse"]
+ assert "Cluster" in result["CreateClusterResponse"]["CreateClusterResult"]
result = result["CreateClusterResponse"]["CreateClusterResult"]["Cluster"]
- result.should.have.key("MasterUsername").equal("masteruser")
- result.should.have.key("MasterUserPassword").equal("****")
- result.should.have.key("ClusterVersion").equal("2.0")
- result.should.have.key("ClusterSubnetGroupName").equal(None)
- result.should.have.key("AvailabilityZone").equal("us-east-1a")
- result.should.have.key("ClusterStatus").equal("creating")
- result.should.have.key("NumberOfNodes").equal(3 if is_json else "3")
- result.should.have.key("PubliclyAccessible").equal(None)
- result.should.have.key("Encrypted").equal("True")
- result.should.have.key("DBName").equal("testdb")
- result.should.have.key("NodeType").equal("ds2.xlarge")
- result.should.have.key("ClusterIdentifier").equal("examplecluster")
- result.should.have.key("Endpoint").should.have.key("Address").match(
- "examplecluster.[a-z0-9]+.us-east-1.redshift.amazonaws.com"
+ assert result["MasterUsername"] == "masteruser"
+ assert result["MasterUserPassword"] == "****"
+ assert result["ClusterVersion"] == "2.0"
+ assert result["ClusterSubnetGroupName"] is None
+ assert result["AvailabilityZone"] == "us-east-1a"
+ assert result["ClusterStatus"] == "creating"
+ assert result["NumberOfNodes"] == (3 if is_json else "3")
+ assert result["PubliclyAccessible"] is None
+ assert result["Encrypted"] == "True"
+ assert result["DBName"] == "testdb"
+ assert result["NodeType"] == "ds2.xlarge"
+ assert result["ClusterIdentifier"] == "examplecluster"
+ assert re.match(
+ "examplecluster.[a-z0-9]+.us-east-1.redshift.amazonaws.com",
+ result["Endpoint"]["Address"],
)
- result.should.have.key("Endpoint").should.have.key("Port").equal(
- 1234 if is_json else "1234"
- )
- result.should.have.key("ClusterCreateTime")
- result.should.have.key("Tags")
+ assert result["Endpoint"]["Port"] == (1234 if is_json else "1234")
+ assert "ClusterCreateTime" in result
+ assert "Tags" in result
tags = result["Tags"]
if not is_json:
tags = tags["item"]
- tags.should.equal(
- [{"Key": "key1", "Value": "val1"}, {"Key": "key2", "Value": "val2"}]
- )
+ assert tags == [{"Key": "key1", "Value": "val1"}, {"Key": "key2", "Value": "val2"}]
@pytest.mark.parametrize("is_json", [True, False], ids=["JSON", "XML"])
@@ -185,18 +179,16 @@ def test_create_and_describe_clusters(is_json):
result = xmltodict.parse(result, dict_constructor=dict)
del result["DescribeClustersResponse"]["ResponseMetadata"]
- result.should.have.key("DescribeClustersResponse")
- result["DescribeClustersResponse"].should.have.key("DescribeClustersResult")
- result["DescribeClustersResponse"]["DescribeClustersResult"].should.have.key(
- "Clusters"
- )
+ assert "DescribeClustersResponse" in result
+ assert "DescribeClustersResult" in result["DescribeClustersResponse"]
+ assert "Clusters" in result["DescribeClustersResponse"]["DescribeClustersResult"]
result = result["DescribeClustersResponse"]["DescribeClustersResult"]["Clusters"]
if not is_json:
result = result["item"]
- result.should.have.length_of(2)
+ assert len(result) == 2
for cluster in result:
- cluster_names.should.contain(cluster["ClusterIdentifier"])
+ assert cluster["ClusterIdentifier"] in cluster_names
@pytest.mark.parametrize("is_json", [True, False], ids=["JSON", "XML"])
@@ -233,12 +225,12 @@ def test_create_and_describe_cluster_security_group(is_json):
groups = groups["item"]
descriptions = [g["Description"] for g in groups]
- descriptions.should.contain("desc for csg1")
- descriptions.should.contain("desc for csg2")
+ assert "desc for csg1" in descriptions
+ assert "desc for csg2" in descriptions
# Describe single SG
describe_params = (
- "/?Action=DescribeClusterSecurityGroups" "&ClusterSecurityGroupName=csg1"
+ "/?Action=DescribeClusterSecurityGroups&ClusterSecurityGroupName=csg1"
)
if is_json:
describe_params += "&ContentType=JSON"
@@ -255,10 +247,10 @@ def test_create_and_describe_cluster_security_group(is_json):
]["ClusterSecurityGroups"]
if is_json:
- groups.should.have.length_of(1)
- groups[0]["ClusterSecurityGroupName"].should.equal("csg1")
+ assert len(groups) == 1
+ assert groups[0]["ClusterSecurityGroupName"] == "csg1"
else:
- groups["item"]["ClusterSecurityGroupName"].should.equal("csg1")
+ assert groups["item"]["ClusterSecurityGroupName"] == "csg1"
@pytest.mark.parametrize("is_json", [True, False], ids=["JSON", "XML"])
@@ -268,13 +260,13 @@ def test_describe_unknown_cluster_security_group(is_json):
test_client = backend.test_client()
describe_params = (
- "/?Action=DescribeClusterSecurityGroups" "&ClusterSecurityGroupName=unknown"
+ "/?Action=DescribeClusterSecurityGroups&ClusterSecurityGroupName=unknown"
)
if is_json:
describe_params += "&ContentType=JSON"
res = test_client.get(describe_params)
- res.status_code.should.equal(400)
+ assert res.status_code == 400
if is_json:
response = json.loads(res.data.decode("utf-8"))
@@ -284,8 +276,8 @@ def test_describe_unknown_cluster_security_group(is_json):
]
error = response["Error"]
- error["Code"].should.equal("ClusterSecurityGroupNotFound")
- error["Message"].should.equal("Security group unknown not found.")
+ assert error["Code"] == "ClusterSecurityGroupNotFound"
+ assert error["Message"] == "Security group unknown not found."
@pytest.mark.parametrize("is_json", [True, False], ids=["JSON", "XML"])
@@ -312,15 +304,15 @@ def test_create_cluster_with_security_group(is_json):
response = xmltodict.parse(response, dict_constructor=dict)
del response["CreateClusterSecurityGroupResponse"]["ResponseMetadata"]
- response.should.have.key("CreateClusterSecurityGroupResponse")
+ assert "CreateClusterSecurityGroupResponse" in response
response = response["CreateClusterSecurityGroupResponse"]
- response.should.have.key("CreateClusterSecurityGroupResult")
+ assert "CreateClusterSecurityGroupResult" in response
result = response["CreateClusterSecurityGroupResult"]
- result.should.have.key("ClusterSecurityGroup")
+ assert "ClusterSecurityGroup" in result
sg = result["ClusterSecurityGroup"]
- sg.should.have.key("ClusterSecurityGroupName").being.equal(csg)
- sg.should.have.key("Description").being.equal("desc for " + csg)
- sg.should.have.key("EC2SecurityGroups").being.equal([] if is_json else None)
+ assert sg["ClusterSecurityGroupName"] == csg
+ assert sg["Description"] == "desc for " + csg
+ assert sg["EC2SecurityGroups"] == ([] if is_json else None)
# Create Cluster with these security groups
create_params = (
@@ -344,17 +336,15 @@ def test_create_cluster_with_security_group(is_json):
result = xmltodict.parse(result, dict_constructor=dict)
del result["CreateClusterResponse"]["ResponseMetadata"]
- result.should.have.key("CreateClusterResponse")
- result["CreateClusterResponse"].should.have.key("CreateClusterResult")
- result["CreateClusterResponse"]["CreateClusterResult"].should.have.key("Cluster")
+ assert "CreateClusterResponse" in result
+ assert "CreateClusterResult" in result["CreateClusterResponse"]
+ assert "Cluster" in result["CreateClusterResponse"]["CreateClusterResult"]
result = result["CreateClusterResponse"]["CreateClusterResult"]["Cluster"]
security_groups = result["ClusterSecurityGroups"]
if not is_json:
security_groups = security_groups["item"]
- security_groups.should.have.length_of(2)
+ assert len(security_groups) == 2
for csg in security_group_names:
- security_groups.should.contain(
- {"ClusterSecurityGroupName": csg, "Status": "active"}
- )
+ assert {"ClusterSecurityGroupName": csg, "Status": "active"} in security_groups