From 1fe69d55a50b404d7c0a59707e88ce03c5ded784 Mon Sep 17 00:00:00 2001 From: kbalk <7536198+kbalk@users.noreply.github.com> Date: Thu, 17 Aug 2023 03:42:19 -0400 Subject: [PATCH] Techdebt: Replace sure with regular assertions in RDS (#6683) --- tests/test_rds/test_db_cluster_param_group.py | 2 +- tests/test_rds/test_filters.py | 109 +- tests/test_rds/test_global_clusters.py | 18 +- tests/test_rds/test_rds.py | 931 +++++++++--------- tests/test_rds/test_rds_cloudformation.py | 39 +- tests/test_rds/test_rds_clusters.py | 289 +++--- .../test_rds_clusters_with_instances.py | 8 +- .../test_rds/test_rds_event_subscriptions.py | 37 +- tests/test_rds/test_rds_export_tasks.py | 70 +- tests/test_rds/test_server.py | 13 +- tests/test_rds/test_utils.py | 63 +- 11 files changed, 810 insertions(+), 769 deletions(-) diff --git a/tests/test_rds/test_db_cluster_param_group.py b/tests/test_rds/test_db_cluster_param_group.py index 87a19f2cf..7bb7ae8a0 100644 --- a/tests/test_rds/test_db_cluster_param_group.py +++ b/tests/test_rds/test_db_cluster_param_group.py @@ -1,7 +1,7 @@ import boto3 +from botocore.exceptions import ClientError import pytest -from botocore.exceptions import ClientError from moto import mock_rds from moto.core import DEFAULT_ACCOUNT_ID diff --git a/tests/test_rds/test_filters.py b/tests/test_rds/test_filters.py index 529b32516..b9920e5e2 100644 --- a/tests/test_rds/test_filters.py +++ b/tests/test_rds/test_filters.py @@ -1,7 +1,6 @@ import boto3 -import pytest -import sure # noqa # pylint: disable=unused-import from botocore.exceptions import ClientError +import pytest from moto import mock_rds @@ -37,8 +36,8 @@ class TestDBInstanceFilters: self.client.describe_db_instances( Filters=[{"Name": "invalid-filter-name", "Values": []}] ) - ex.value.response["Error"]["Code"].should.equal("InvalidParameterValue") - ex.value.response["Error"]["Message"].should.equal( + assert ex.value.response["Error"]["Code"] == "InvalidParameterValue" + assert ex.value.response["Error"]["Message"] == ( "Unrecognized filter name: invalid-filter-name" ) @@ -47,8 +46,8 @@ class TestDBInstanceFilters: self.client.describe_db_instances( Filters=[{"Name": "db-instance-id", "Values": []}] ) - ex.value.response["Error"]["Code"].should.equal("InvalidParameterCombination") - ex.value.response["Error"]["Message"].should.contain("must not be empty") + assert ex.value.response["Error"]["Code"] == "InvalidParameterCombination" + assert "must not be empty" in ex.value.response["Error"]["Message"] def test_db_cluster_id_filter(self): resp = self.client.describe_db_instances() @@ -57,8 +56,8 @@ class TestDBInstanceFilters: db_instances = self.client.describe_db_instances( Filters=[{"Name": "db-cluster-id", "Values": [db_cluster_identifier]}] ).get("DBInstances") - db_instances.should.have.length_of(1) - db_instances[0]["DBClusterIdentifier"].should.equal(db_cluster_identifier) + assert len(db_instances) == 1 + assert db_instances[0]["DBClusterIdentifier"] == db_cluster_identifier def test_db_instance_id_filter(self): resp = self.client.describe_db_instances() @@ -67,8 +66,8 @@ class TestDBInstanceFilters: db_instances = self.client.describe_db_instances( Filters=[{"Name": "db-instance-id", "Values": [db_instance_identifier]}] ).get("DBInstances") - db_instances.should.have.length_of(1) - db_instances[0]["DBInstanceIdentifier"].should.equal(db_instance_identifier) + assert len(db_instances) == 1 + assert db_instances[0]["DBInstanceIdentifier"] == db_instance_identifier def test_db_instance_id_filter_works_with_arns(self): resp = self.client.describe_db_instances() @@ -77,8 +76,8 @@ class TestDBInstanceFilters: db_instances = self.client.describe_db_instances( Filters=[{"Name": "db-instance-id", "Values": [db_instance_arn]}] ).get("DBInstances") - db_instances.should.have.length_of(1) - db_instances[0]["DBInstanceArn"].should.equal(db_instance_arn) + assert len(db_instances) == 1 + assert db_instances[0]["DBInstanceArn"] == db_instance_arn def test_dbi_resource_id_filter(self): resp = self.client.describe_db_instances() @@ -88,19 +87,19 @@ class TestDBInstanceFilters: Filters=[{"Name": "dbi-resource-id", "Values": [dbi_resource_identifier]}] ).get("DBInstances") for db_instance in db_instances: - db_instance["DbiResourceId"].should.equal(dbi_resource_identifier) + assert db_instance["DbiResourceId"] == dbi_resource_identifier def test_engine_filter(self): db_instances = self.client.describe_db_instances( Filters=[{"Name": "engine", "Values": ["postgres"]}] ).get("DBInstances") for db_instance in db_instances: - db_instance["Engine"].should.equal("postgres") + assert db_instance["Engine"] == "postgres" db_instances = self.client.describe_db_instances( Filters=[{"Name": "engine", "Values": ["oracle"]}] ).get("DBInstances") - db_instances.should.have.length_of(0) + assert len(db_instances) == 0 def test_multiple_filters(self): resp = self.client.describe_db_instances( @@ -115,9 +114,9 @@ class TestDBInstanceFilters: returned_identifiers = [ db["DBInstanceIdentifier"] for db in resp["DBInstances"] ] - returned_identifiers.should.have.length_of(2) - "db-instance-0".should.be.within(returned_identifiers) - "db-instance-3".should.be.within(returned_identifiers) + assert len(returned_identifiers) == 2 + assert "db-instance-0" in returned_identifiers + assert "db-instance-3" in returned_identifiers def test_invalid_db_instance_identifier_with_exclusive_filter(self): # Passing a non-existent DBInstanceIdentifier will not raise an error @@ -126,8 +125,8 @@ class TestDBInstanceFilters: DBInstanceIdentifier="non-existent", Filters=[{"Name": "db-instance-id", "Values": ["db-instance-1"]}], ) - resp["DBInstances"].should.have.length_of(1) - resp["DBInstances"][0]["DBInstanceIdentifier"].should.equal("db-instance-1") + assert len(resp["DBInstances"]) == 1 + assert resp["DBInstances"][0]["DBInstanceIdentifier"] == "db-instance-1" def test_invalid_db_instance_identifier_with_non_matching_filter(self): # Passing a non-existent DBInstanceIdentifier will raise an error if @@ -137,8 +136,8 @@ class TestDBInstanceFilters: DBInstanceIdentifier="non-existent", Filters=[{"Name": "engine", "Values": ["mysql"]}], ) - ex.value.response["Error"]["Code"].should.equal("DBInstanceNotFound") - ex.value.response["Error"]["Message"].should.equal( + assert ex.value.response["Error"]["Code"] == "DBInstanceNotFound" + assert ex.value.response["Error"]["Message"] == ( "DBInstance non-existent not found." ) @@ -155,8 +154,8 @@ class TestDBInstanceFilters: returned_identifiers = [ db["DBInstanceIdentifier"] for db in resp["DBInstances"] ] - "db-instance-0".should_not.be.within(returned_identifiers) - "db-instance-1".should.be.within(returned_identifiers) + assert "db-instance-0" not in returned_identifiers + assert "db-instance-1" in returned_identifiers def test_valid_db_instance_identifier_with_inclusive_filter(self): # Passing a valid DBInstanceIdentifier with a filter it matches but also @@ -171,8 +170,8 @@ class TestDBInstanceFilters: returned_identifiers = [ db["DBInstanceIdentifier"] for db in resp["DBInstances"] ] - "db-instance-0".should.be.within(returned_identifiers) - "db-instance-1".should.be.within(returned_identifiers) + assert "db-instance-0" in returned_identifiers + assert "db-instance-1" in returned_identifiers def test_valid_db_instance_identifier_with_non_matching_filter(self): # Passing a valid DBInstanceIdentifier will raise an error if the @@ -182,8 +181,8 @@ class TestDBInstanceFilters: DBInstanceIdentifier="db-instance-0", Filters=[{"Name": "engine", "Values": ["postgres"]}], ) - ex.value.response["Error"]["Code"].should.equal("DBInstanceNotFound") - ex.value.response["Error"]["Message"].should.equal( + assert ex.value.response["Error"]["Code"] == "DBInstanceNotFound" + assert ex.value.response["Error"]["Message"] == ( "DBInstance db-instance-0 not found." ) @@ -224,8 +223,8 @@ class TestDBSnapshotFilters: self.client.describe_db_snapshots( Filters=[{"Name": "invalid-filter-name", "Values": []}] ) - ex.value.response["Error"]["Code"].should.equal("InvalidParameterValue") - ex.value.response["Error"]["Message"].should.equal( + assert ex.value.response["Error"]["Code"] == "InvalidParameterValue" + assert ex.value.response["Error"]["Message"] == ( "Unrecognized filter name: invalid-filter-name" ) @@ -234,15 +233,15 @@ class TestDBSnapshotFilters: self.client.describe_db_snapshots( Filters=[{"Name": "db-snapshot-id", "Values": []}] ) - ex.value.response["Error"]["Code"].should.equal("InvalidParameterCombination") - ex.value.response["Error"]["Message"].should.contain("must not be empty") + assert ex.value.response["Error"]["Code"] == "InvalidParameterCombination" + assert "must not be empty" in ex.value.response["Error"]["Message"] def test_db_snapshot_id_filter(self): snapshots = self.client.describe_db_snapshots( Filters=[{"Name": "db-snapshot-id", "Values": ["db-instance-1-snapshot-0"]}] ).get("DBSnapshots") - snapshots.should.have.length_of(1) - snapshots[0]["DBSnapshotIdentifier"].should.equal("db-instance-1-snapshot-0") + assert len(snapshots) == 1 + assert snapshots[0]["DBSnapshotIdentifier"] == "db-instance-1-snapshot-0" def test_db_instance_id_filter(self): resp = self.client.describe_db_instances() @@ -252,7 +251,7 @@ class TestDBSnapshotFilters: Filters=[{"Name": "db-instance-id", "Values": [db_instance_identifier]}] ).get("DBSnapshots") for snapshot in snapshots: - snapshot["DBInstanceIdentifier"].should.equal(db_instance_identifier) + assert snapshot["DBInstanceIdentifier"] == db_instance_identifier def test_db_instance_id_filter_works_with_arns(self): resp = self.client.describe_db_instances() @@ -263,7 +262,7 @@ class TestDBSnapshotFilters: Filters=[{"Name": "db-instance-id", "Values": [db_instance_arn]}] ).get("DBSnapshots") for snapshot in snapshots: - snapshot["DBInstanceIdentifier"].should.equal(db_instance_identifier) + assert snapshot["DBInstanceIdentifier"] == db_instance_identifier def test_dbi_resource_id_filter(self): resp = self.client.describe_db_instances() @@ -273,19 +272,19 @@ class TestDBSnapshotFilters: Filters=[{"Name": "dbi-resource-id", "Values": [dbi_resource_identifier]}] ).get("DBSnapshots") for snapshot in snapshots: - snapshot["DbiResourceId"].should.equal(dbi_resource_identifier) + assert snapshot["DbiResourceId"] == dbi_resource_identifier def test_engine_filter(self): snapshots = self.client.describe_db_snapshots( Filters=[{"Name": "engine", "Values": ["postgres"]}] ).get("DBSnapshots") for snapshot in snapshots: - snapshot["Engine"].should.equal("postgres") + assert snapshot["Engine"] == "postgres" snapshots = self.client.describe_db_snapshots( Filters=[{"Name": "engine", "Values": ["oracle"]}] ).get("DBSnapshots") - snapshots.should.have.length_of(0) + assert len(snapshots) == 0 def test_snapshot_type_filter(self): snapshots = self.client.describe_db_snapshots( @@ -310,8 +309,8 @@ class TestDBSnapshotFilters: {"Name": "engine", "Values": ["mysql"]}, ] ).get("DBSnapshots") - snapshots.should.have.length_of(1) - snapshots[0]["DBSnapshotIdentifier"].should.equal("db-instance-0-snapshot-1") + assert len(snapshots) == 1 + assert snapshots[0]["DBSnapshotIdentifier"] == "db-instance-0-snapshot-1" def test_invalid_snapshot_id_with_db_instance_id_and_filter(self): # Passing a non-existent DBSnapshotIdentifier will return an empty list @@ -321,7 +320,7 @@ class TestDBSnapshotFilters: DBInstanceIdentifier="a-db-instance-identifier", Filters=[{"Name": "db-instance-id", "Values": ["db-instance-1"]}], ) - resp["DBSnapshots"].should.have.length_of(0) + assert len(resp["DBSnapshots"]) == 0 def test_invalid_snapshot_id_with_non_matching_filter(self): # Passing a non-existent DBSnapshotIdentifier will raise an error if @@ -331,8 +330,8 @@ class TestDBSnapshotFilters: DBSnapshotIdentifier="non-existent", Filters=[{"Name": "engine", "Values": ["oracle"]}], ) - ex.value.response["Error"]["Code"].should.equal("DBSnapshotNotFound") - ex.value.response["Error"]["Message"].should.equal( + assert ex.value.response["Error"]["Code"] == "DBSnapshotNotFound" + assert ex.value.response["Error"]["Message"] == ( "DBSnapshot non-existent not found." ) @@ -347,8 +346,8 @@ class TestDBSnapshotFilters: {"Name": "engine", "Values": ["postgres"]}, ], ) - resp["DBSnapshots"].should.have.length_of(1) - resp["DBSnapshots"][0]["DBSnapshotIdentifier"].should.equal( + assert len(resp["DBSnapshots"]) == 1 + assert resp["DBSnapshots"][0]["DBSnapshotIdentifier"] == ( "db-instance-1-snapshot-1" ) @@ -367,9 +366,9 @@ class TestDBSnapshotFilters: ], ).get("DBSnapshots") returned_identifiers = [ss["DBSnapshotIdentifier"] for ss in snapshots] - returned_identifiers.should.have.length_of(2) - "db-instance-0-snapshot-0".should.be.within(returned_identifiers) - "db-instance-1-snapshot-1".should.be.within(returned_identifiers) + assert len(returned_identifiers) == 2 + assert "db-instance-0-snapshot-0" in returned_identifiers + assert "db-instance-1-snapshot-1" in returned_identifiers def test_valid_snapshot_id_with_non_matching_filter(self): # Passing a valid DBSnapshotIdentifier will raise an error if the @@ -379,8 +378,8 @@ class TestDBSnapshotFilters: DBSnapshotIdentifier="db-instance-0-snapshot-0", Filters=[{"Name": "engine", "Values": ["postgres"]}], ) - ex.value.response["Error"]["Code"].should.equal("DBSnapshotNotFound") - ex.value.response["Error"]["Message"].should.equal( + assert ex.value.response["Error"]["Code"] == "DBSnapshotNotFound" + assert ex.value.response["Error"]["Message"] == ( "DBSnapshot db-instance-0-snapshot-0 not found." ) @@ -422,8 +421,8 @@ class TestDBClusterSnapshotFilters: self.client.describe_db_cluster_snapshots( Filters=[{"Name": "invalid-filter-name", "Values": []}] ) - ex.value.response["Error"]["Code"].should.equal("InvalidParameterValue") - ex.value.response["Error"]["Message"].should.equal( + assert ex.value.response["Error"]["Code"] == "InvalidParameterValue" + assert ex.value.response["Error"]["Message"] == ( "Unrecognized filter name: invalid-filter-name" ) @@ -432,8 +431,8 @@ class TestDBClusterSnapshotFilters: self.client.describe_db_cluster_snapshots( Filters=[{"Name": "snapshot-type", "Values": []}] ) - ex.value.response["Error"]["Code"].should.equal("InvalidParameterCombination") - ex.value.response["Error"]["Message"].should.contain("must not be empty") + assert ex.value.response["Error"]["Code"] == "InvalidParameterCombination" + assert "must not be empty" in ex.value.response["Error"]["Message"] def test_snapshot_type_filter(self): snapshots = self.client.describe_db_cluster_snapshots( diff --git a/tests/test_rds/test_global_clusters.py b/tests/test_rds/test_global_clusters.py index 3b51f9c12..f02432407 100644 --- a/tests/test_rds/test_global_clusters.py +++ b/tests/test_rds/test_global_clusters.py @@ -1,7 +1,7 @@ import boto3 +from botocore.exceptions import ClientError import pytest -from botocore.exceptions import ClientError from moto import mock_rds from moto.core import DEFAULT_ACCOUNT_ID @@ -65,7 +65,8 @@ def test_global_cluster_members(): @mock_rds def test_create_global_cluster_from_regular_cluster(): # WHEN create_db_cluster is called - # AND create_global_cluster is called with SourceDBClusterIdentifier set as the earlier created db cluster + # AND create_global_cluster is called with SourceDBClusterIdentifier + # set as the earlier created db cluster # THEN that db cluster is elevated to a global cluster # AND it still shows up when calling describe_db_clusters client = boto3.client("rds", "us-east-1") @@ -184,9 +185,10 @@ def test_create_global_cluster_from_regular_cluster__and_specify_engine(): ) err = exc.value.response["Error"] assert err["Code"] == "InvalidParameterCombination" - assert ( - err["Message"] - == "When creating global cluster from existing db cluster, value for engineName should not be specified since it will be inherited from source cluster" + assert err["Message"] == ( + "When creating global cluster from existing db cluster, value for " + "engineName should not be specified since it will be inherited " + "from source cluster" ) @@ -195,11 +197,13 @@ def test_delete_non_global_cluster(): # WHEN a global cluster contains a regular cluster # AND we attempt to delete the global cluster # THEN we get an error message - # An error occurs (InvalidGlobalClusterStateFault) when calling the DeleteGlobalCluster operation: Global Cluster arn:aws:rds::486285699788:global-cluster:g1 is not empty + # An error occurs (InvalidGlobalClusterStateFault) when calling the + # DeleteGlobalCluster operation: Global Cluster + # arn:aws:rds::486285699788:global-cluster:g1 is not empty client = boto3.client("rds", "us-east-1") client.create_global_cluster(GlobalClusterIdentifier="gc1", Engine="aurora-mysql") - client.create_db_cluster( + _ = client.create_db_cluster( DBClusterIdentifier="dbci", GlobalClusterIdentifier="gc1", Engine="mysql", diff --git a/tests/test_rds/test_rds.py b/tests/test_rds/test_rds.py index 1e331d8bd..4fafa9d40 100644 --- a/tests/test_rds/test_rds.py +++ b/tests/test_rds/test_rds.py @@ -1,7 +1,9 @@ -from botocore.exceptions import ClientError +import datetime + import boto3 +from botocore.exceptions import ClientError import pytest -import sure # noqa # pylint: disable=unused-import + from moto import mock_ec2, mock_kms, mock_rds from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID @@ -26,26 +28,26 @@ def test_create_database(): EnableCloudwatchLogsExports=["audit", "error"], ) db_instance = database["DBInstance"] - db_instance["AllocatedStorage"].should.equal(10) - db_instance["DBInstanceClass"].should.equal("db.m1.small") - db_instance["LicenseModel"].should.equal("license-included") - db_instance["MasterUsername"].should.equal("root") - db_instance["DBSecurityGroups"][0]["DBSecurityGroupName"].should.equal("my_sg") - db_instance["DBInstanceArn"].should.equal( + assert db_instance["AllocatedStorage"] == 10 + assert db_instance["DBInstanceClass"] == "db.m1.small" + assert db_instance["LicenseModel"] == "license-included" + assert db_instance["MasterUsername"] == "root" + assert db_instance["DBSecurityGroups"][0]["DBSecurityGroupName"] == "my_sg" + assert db_instance["DBInstanceArn"] == ( f"arn:aws:rds:us-west-2:{ACCOUNT_ID}:db:db-master-1" ) - db_instance["DBInstanceStatus"].should.equal("available") - db_instance["DBName"].should.equal("staging-postgres") - db_instance["DBInstanceIdentifier"].should.equal("db-master-1") - db_instance["IAMDatabaseAuthenticationEnabled"].should.equal(False) - db_instance["DbiResourceId"].should.contain("db-") - db_instance["CopyTagsToSnapshot"].should.equal(False) - db_instance["InstanceCreateTime"].should.be.a("datetime.datetime") - db_instance["VpcSecurityGroups"][0]["VpcSecurityGroupId"].should.equal("sg-123456") - db_instance["DeletionProtection"].should.equal(False) - db_instance["EnabledCloudwatchLogsExports"].should.equal(["audit", "error"]) - db_instance["Endpoint"]["Port"].should.equal(1234) - db_instance["DbInstancePort"].should.equal(1234) + assert db_instance["DBInstanceStatus"] == "available" + assert db_instance["DBName"] == "staging-postgres" + assert db_instance["DBInstanceIdentifier"] == "db-master-1" + assert db_instance["IAMDatabaseAuthenticationEnabled"] is False + assert "db-" in db_instance["DbiResourceId"] + assert db_instance["CopyTagsToSnapshot"] is False + assert isinstance(db_instance["InstanceCreateTime"], datetime.datetime) + assert db_instance["VpcSecurityGroups"][0]["VpcSecurityGroupId"] == "sg-123456" + assert db_instance["DeletionProtection"] is False + assert db_instance["EnabledCloudwatchLogsExports"] == ["audit", "error"] + assert db_instance["Endpoint"]["Port"] == 1234 + assert db_instance["DbInstancePort"] == 1234 @mock_rds @@ -60,8 +62,8 @@ def test_database_with_deletion_protection_cannot_be_deleted(): DeletionProtection=True, ) db_instance = database["DBInstance"] - db_instance["DBInstanceClass"].should.equal("db.m1.small") - db_instance["DeletionProtection"].should.equal(True) + assert db_instance["DBInstanceClass"] == "db.m1.small" + assert db_instance["DeletionProtection"] is True @mock_rds @@ -74,10 +76,10 @@ def test_create_database_no_allocated_storage(): DBInstanceClass="db.m1.small", ) db_instance = database["DBInstance"] - db_instance["Engine"].should.equal("postgres") - db_instance["StorageType"].should.equal("gp2") - db_instance["AllocatedStorage"].should.equal(20) - db_instance["PreferredMaintenanceWindow"].should.equal("wed:06:38-wed:07:08") + assert db_instance["Engine"] == "postgres" + assert db_instance["StorageType"] == "gp2" + assert db_instance["AllocatedStorage"] == 20 + assert db_instance["PreferredMaintenanceWindow"] == "wed:06:38-wed:07:08" @mock_rds @@ -92,8 +94,8 @@ def test_create_database_invalid_preferred_maintenance_window_more_24_hours(): PreferredMaintenanceWindow="mon:16:00-tue:17:00", ) err = ex.value.response["Error"] - err["Code"].should.equal("InvalidParameterValue") - err["Message"].should.equal("Maintenance window must be less than 24 hours.") + assert err["Code"] == "InvalidParameterValue" + assert err["Message"] == "Maintenance window must be less than 24 hours." @mock_rds @@ -108,8 +110,8 @@ def test_create_database_invalid_preferred_maintenance_window_less_30_mins(): PreferredMaintenanceWindow="mon:16:00-mon:16:05", ) err = ex.value.response["Error"] - err["Code"].should.equal("InvalidParameterValue") - err["Message"].should.equal("The maintenance window must be at least 30 minutes.") + assert err["Code"] == "InvalidParameterValue" + assert err["Message"] == "The maintenance window must be at least 30 minutes." @mock_rds @@ -124,8 +126,8 @@ def test_create_database_invalid_preferred_maintenance_window_value(): PreferredMaintenanceWindow="sim:16:00-mon:16:30", ) err = ex.value.response["Error"] - err["Code"].should.equal("InvalidParameterValue") - err["Message"].should.contain("Invalid day:hour:minute") + assert err["Code"] == "InvalidParameterValue" + assert "Invalid day:hour:minute" in err["Message"] @mock_rds @@ -140,10 +142,11 @@ def test_create_database_invalid_preferred_maintenance_window_format(): PreferredMaintenanceWindow="mon:16tue:17:00", ) err = ex.value.response["Error"] - err["Code"].should.equal("InvalidParameterValue") - err["Message"].should.contain( - "Should be specified as a range ddd:hh24:mi-ddd:hh24:mi (24H Clock UTC). Example: Sun:23:45-Mon:00:15" - ) + assert err["Code"] == "InvalidParameterValue" + assert ( + "Should be specified as a range ddd:hh24:mi-ddd:hh24:mi " + "(24H Clock UTC). Example: Sun:23:45-Mon:00:15" + ) in err["Message"] @mock_rds @@ -159,9 +162,9 @@ def test_create_database_preferred_backup_window_overlap_no_spill(): PreferredBackupWindow="20:00-20:30", ) err = ex.value.response["Error"] - err["Code"].should.equal("InvalidParameterValue") - err["Message"].should.contain( - "The backup window and maintenance window must not overlap." + assert err["Code"] == "InvalidParameterValue" + assert ( + "The backup window and maintenance window must not overlap." in err["Message"] ) @@ -178,9 +181,9 @@ def test_create_database_preferred_backup_window_overlap_maintenance_window_spil PreferredBackupWindow="00:00-00:30", ) err = ex.value.response["Error"] - err["Code"].should.equal("InvalidParameterValue") - err["Message"].should.contain( - "The backup window and maintenance window must not overlap." + assert err["Code"] == "InvalidParameterValue" + assert ( + "The backup window and maintenance window must not overlap." in err["Message"] ) @@ -197,9 +200,9 @@ def test_create_database_preferred_backup_window_overlap_backup_window_spill(): PreferredBackupWindow="23:50-00:20", ) err = ex.value.response["Error"] - err["Code"].should.equal("InvalidParameterValue") - err["Message"].should.contain( - "The backup window and maintenance window must not overlap." + assert err["Code"] == "InvalidParameterValue" + assert ( + "The backup window and maintenance window must not overlap." in err["Message"] ) @@ -216,9 +219,9 @@ def test_create_database_preferred_backup_window_overlap_both_spill(): PreferredBackupWindow="23:50-00:20", ) err = ex.value.response["Error"] - err["Code"].should.equal("InvalidParameterValue") - err["Message"].should.contain( - "The backup window and maintenance window must not overlap." + assert err["Code"] == "InvalidParameterValue" + assert ( + "The backup window and maintenance window must not overlap." in err["Message"] ) @@ -233,8 +236,8 @@ def test_create_database_valid_preferred_maintenance_window_format(): PreferredMaintenanceWindow="sun:16:00-sun:16:30", ) db_instance = database["DBInstance"] - db_instance["DBInstanceClass"].should.equal("db.m1.small") - db_instance["PreferredMaintenanceWindow"].should.equal("sun:16:00-sun:16:30") + assert db_instance["DBInstanceClass"] == "db.m1.small" + assert db_instance["PreferredMaintenanceWindow"] == "sun:16:00-sun:16:30" @mock_rds @@ -248,21 +251,22 @@ def test_create_database_valid_preferred_maintenance_window_uppercase_format(): PreferredMaintenanceWindow="MON:16:00-TUE:01:30", ) db_instance = database["DBInstance"] - db_instance["DBInstanceClass"].should.equal("db.m1.small") - db_instance["PreferredMaintenanceWindow"].should.equal("mon:16:00-tue:01:30") + assert db_instance["DBInstanceClass"] == "db.m1.small" + assert db_instance["PreferredMaintenanceWindow"] == "mon:16:00-tue:01:30" @mock_rds def test_create_database_non_existing_option_group(): conn = boto3.client("rds", region_name=DEFAULT_REGION) - conn.create_db_instance.when.called_with( - DBInstanceIdentifier="db-master-1", - AllocatedStorage=10, - Engine="postgres", - DBName="staging-postgres", - DBInstanceClass="db.m1.small", - OptionGroupName="non-existing", - ).should.throw(ClientError) + with pytest.raises(ClientError): + conn.create_db_instance( + DBInstanceIdentifier="db-master-1", + AllocatedStorage=10, + Engine="postgres", + DBName="staging-postgres", + DBInstanceClass="db.m1.small", + OptionGroupName="non-existing", + ) @mock_rds @@ -283,10 +287,10 @@ def test_create_database_with_option_group(): OptionGroupName="my-og", ) db_instance = database["DBInstance"] - db_instance["AllocatedStorage"].should.equal(10) - db_instance["DBInstanceClass"].should.equal("db.m1.small") - db_instance["DBName"].should.equal("staging-postgres") - db_instance["OptionGroupMemberships"][0]["OptionGroupName"].should.equal("my-og") + assert db_instance["AllocatedStorage"] == 10 + assert db_instance["DBInstanceClass"] == "db.m1.small" + assert db_instance["DBName"] == "staging-postgres" + assert db_instance["OptionGroupMemberships"][0]["OptionGroupName"] == "my-og" @mock_rds @@ -307,22 +311,23 @@ def test_stop_database(): mydb = conn.describe_db_instances( DBInstanceIdentifier=database["DBInstance"]["DBInstanceIdentifier"] )["DBInstances"][0] - mydb["DBInstanceStatus"].should.equal("available") + assert mydb["DBInstanceStatus"] == "available" # test stopping database should shutdown response = conn.stop_db_instance(DBInstanceIdentifier=mydb["DBInstanceIdentifier"]) - response["ResponseMetadata"]["HTTPStatusCode"].should.equal(200) - response["DBInstance"]["DBInstanceStatus"].should.equal("stopped") + assert response["ResponseMetadata"]["HTTPStatusCode"] == 200 + assert response["DBInstance"]["DBInstanceStatus"] == "stopped" # test rdsclient error when trying to stop an already stopped database - conn.stop_db_instance.when.called_with( - DBInstanceIdentifier=mydb["DBInstanceIdentifier"] - ).should.throw(ClientError) - # test stopping a stopped database with snapshot should error and no snapshot should exist for that call - conn.stop_db_instance.when.called_with( - DBInstanceIdentifier=mydb["DBInstanceIdentifier"], - DBSnapshotIdentifier="rocky4570-rds-snap", - ).should.throw(ClientError) + with pytest.raises(ClientError): + conn.stop_db_instance(DBInstanceIdentifier=mydb["DBInstanceIdentifier"]) + # test stopping a stopped database with snapshot should error and no + # snapshot should exist for that call + with pytest.raises(ClientError): + conn.stop_db_instance( + DBInstanceIdentifier=mydb["DBInstanceIdentifier"], + DBSnapshotIdentifier="rocky4570-rds-snap", + ) response = conn.describe_db_snapshots() - response["DBSnapshots"].should.equal([]) + assert response["DBSnapshots"] == [] @mock_rds @@ -343,39 +348,37 @@ def test_start_database(): mydb = conn.describe_db_instances( DBInstanceIdentifier=database["DBInstance"]["DBInstanceIdentifier"] )["DBInstances"][0] - mydb["DBInstanceStatus"].should.equal("available") + assert mydb["DBInstanceStatus"] == "available" # test starting an already started database should error - conn.start_db_instance.when.called_with( - DBInstanceIdentifier=mydb["DBInstanceIdentifier"] - ).should.throw(ClientError) - # stop and test start - should go from stopped to available, create snapshot and check snapshot + with pytest.raises(ClientError): + conn.start_db_instance(DBInstanceIdentifier=mydb["DBInstanceIdentifier"]) + # stop and test start - should go from stopped to available, create + # snapshot and check snapshot response = conn.stop_db_instance( DBInstanceIdentifier=mydb["DBInstanceIdentifier"], DBSnapshotIdentifier="rocky4570-rds-snap", ) - response["ResponseMetadata"]["HTTPStatusCode"].should.equal(200) - response["DBInstance"]["DBInstanceStatus"].should.equal("stopped") + assert response["ResponseMetadata"]["HTTPStatusCode"] == 200 + assert response["DBInstance"]["DBInstanceStatus"] == "stopped" response = conn.describe_db_snapshots() - response["DBSnapshots"][0]["DBSnapshotIdentifier"].should.equal( - "rocky4570-rds-snap" - ) + assert response["DBSnapshots"][0]["DBSnapshotIdentifier"] == "rocky4570-rds-snap" response = conn.start_db_instance(DBInstanceIdentifier=mydb["DBInstanceIdentifier"]) - response["ResponseMetadata"]["HTTPStatusCode"].should.equal(200) - response["DBInstance"]["DBInstanceStatus"].should.equal("available") + assert response["ResponseMetadata"]["HTTPStatusCode"] == 200 + assert response["DBInstance"]["DBInstanceStatus"] == "available" # starting database should not remove snapshot response = conn.describe_db_snapshots() - response["DBSnapshots"][0]["DBSnapshotIdentifier"].should.equal( - "rocky4570-rds-snap" - ) - # test stopping database, create snapshot with existing snapshot already created should throw error - conn.stop_db_instance.when.called_with( - DBInstanceIdentifier=mydb["DBInstanceIdentifier"], - DBSnapshotIdentifier="rocky4570-rds-snap", - ).should.throw(ClientError) + assert response["DBSnapshots"][0]["DBSnapshotIdentifier"] == "rocky4570-rds-snap" + # test stopping database, create snapshot with existing snapshot already + # created should throw error + with pytest.raises(ClientError): + conn.stop_db_instance( + DBInstanceIdentifier=mydb["DBInstanceIdentifier"], + DBSnapshotIdentifier="rocky4570-rds-snap", + ) # test stopping database not invoking snapshot should succeed. response = conn.stop_db_instance(DBInstanceIdentifier=mydb["DBInstanceIdentifier"]) - response["ResponseMetadata"]["HTTPStatusCode"].should.equal(200) - response["DBInstance"]["DBInstanceStatus"].should.equal("stopped") + assert response["ResponseMetadata"]["HTTPStatusCode"] == 200 + assert response["DBInstance"]["DBInstanceStatus"] == "stopped" @mock_rds @@ -398,15 +401,13 @@ def test_fail_to_stop_multi_az_and_sqlserver(): mydb = conn.describe_db_instances( DBInstanceIdentifier=database["DBInstance"]["DBInstanceIdentifier"] )["DBInstances"][0] - mydb["DBInstanceStatus"].should.equal("available") + assert mydb["DBInstanceStatus"] == "available" # multi-az databases arent allowed to be shutdown at this time. - conn.stop_db_instance.when.called_with( - DBInstanceIdentifier=mydb["DBInstanceIdentifier"] - ).should.throw(ClientError) + with pytest.raises(ClientError): + conn.stop_db_instance(DBInstanceIdentifier=mydb["DBInstanceIdentifier"]) # multi-az databases arent allowed to be started up at this time. - conn.start_db_instance.when.called_with( - DBInstanceIdentifier=mydb["DBInstanceIdentifier"] - ).should.throw(ClientError) + with pytest.raises(ClientError): + conn.start_db_instance(DBInstanceIdentifier=mydb["DBInstanceIdentifier"]) @mock_rds @@ -429,11 +430,11 @@ def test_stop_multi_az_postgres(): mydb = conn.describe_db_instances( DBInstanceIdentifier=database["DBInstance"]["DBInstanceIdentifier"] )["DBInstances"][0] - mydb["DBInstanceStatus"].should.equal("available") + assert mydb["DBInstanceStatus"] == "available" response = conn.stop_db_instance(DBInstanceIdentifier=mydb["DBInstanceIdentifier"]) - response["ResponseMetadata"]["HTTPStatusCode"].should.equal(200) - response["DBInstance"]["DBInstanceStatus"].should.equal("stopped") + assert response["ResponseMetadata"]["HTTPStatusCode"] == 200 + assert response["DBInstance"]["DBInstanceStatus"] == "stopped" @mock_rds @@ -461,15 +462,13 @@ def test_fail_to_stop_readreplica(): mydb = conn.describe_db_instances( DBInstanceIdentifier=replica["DBInstance"]["DBInstanceIdentifier"] )["DBInstances"][0] - mydb["DBInstanceStatus"].should.equal("available") + assert mydb["DBInstanceStatus"] == "available" # read-replicas are not allowed to be stopped at this time. - conn.stop_db_instance.when.called_with( - DBInstanceIdentifier=mydb["DBInstanceIdentifier"] - ).should.throw(ClientError) + with pytest.raises(ClientError): + conn.stop_db_instance(DBInstanceIdentifier=mydb["DBInstanceIdentifier"]) # read-replicas are not allowed to be started at this time. - conn.start_db_instance.when.called_with( - DBInstanceIdentifier=mydb["DBInstanceIdentifier"] - ).should.throw(ClientError) + with pytest.raises(ClientError): + conn.start_db_instance(DBInstanceIdentifier=mydb["DBInstanceIdentifier"]) @mock_rds @@ -477,7 +476,7 @@ def test_get_databases(): conn = boto3.client("rds", region_name=DEFAULT_REGION) instances = conn.describe_db_instances() - list(instances["DBInstances"]).should.have.length_of(0) + assert len(list(instances["DBInstances"])) == 0 conn.create_db_instance( DBInstanceIdentifier="db-master-1", @@ -501,20 +500,20 @@ def test_get_databases(): DeletionProtection=True, ) instances = conn.describe_db_instances() - list(instances["DBInstances"]).should.have.length_of(2) + assert len(list(instances["DBInstances"])) == 2 instances = conn.describe_db_instances(DBInstanceIdentifier="db-master-1") - list(instances["DBInstances"]).should.have.length_of(1) - instances["DBInstances"][0]["DBInstanceIdentifier"].should.equal("db-master-1") - instances["DBInstances"][0]["DeletionProtection"].should.equal(False) - instances["DBInstances"][0]["DBInstanceArn"].should.equal( + assert len(list(instances["DBInstances"])) == 1 + assert instances["DBInstances"][0]["DBInstanceIdentifier"] == "db-master-1" + assert instances["DBInstances"][0]["DeletionProtection"] is False + assert instances["DBInstances"][0]["DBInstanceArn"] == ( f"arn:aws:rds:us-west-2:{ACCOUNT_ID}:db:db-master-1" ) instances = conn.describe_db_instances(DBInstanceIdentifier="db-master-2") - instances["DBInstances"][0]["DeletionProtection"].should.equal(True) - instances["DBInstances"][0]["Endpoint"]["Port"].should.equal(1234) - instances["DBInstances"][0]["DbInstancePort"].should.equal(1234) + assert instances["DBInstances"][0]["DeletionProtection"] is True + assert instances["DBInstances"][0]["Endpoint"]["Port"] == 1234 + assert instances["DBInstances"][0]["DbInstancePort"] == 1234 @mock_rds @@ -531,22 +530,21 @@ def test_get_databases_paginated(): ) resp = conn.describe_db_instances() - resp["DBInstances"].should.have.length_of(50) - resp["Marker"].should.equal(resp["DBInstances"][-1]["DBInstanceIdentifier"]) + assert len(resp["DBInstances"]) == 50 + assert resp["Marker"] == resp["DBInstances"][-1]["DBInstanceIdentifier"] resp2 = conn.describe_db_instances(Marker=resp["Marker"]) - resp2["DBInstances"].should.have.length_of(1) + assert len(resp2["DBInstances"]) == 1 resp3 = conn.describe_db_instances(MaxRecords=100) - resp3["DBInstances"].should.have.length_of(51) + assert len(resp3["DBInstances"]) == 51 @mock_rds def test_describe_non_existent_database(): conn = boto3.client("rds", region_name=DEFAULT_REGION) - conn.describe_db_instances.when.called_with( - DBInstanceIdentifier="not-a-db" - ).should.throw(ClientError) + with pytest.raises(ClientError): + conn.describe_db_instances(DBInstanceIdentifier="not-a-db") @mock_rds @@ -563,7 +561,7 @@ def test_modify_db_instance(): DBSecurityGroups=["my_sg"], ) instances = conn.describe_db_instances(DBInstanceIdentifier="db-master-1") - instances["DBInstances"][0]["AllocatedStorage"].should.equal(10) + assert instances["DBInstances"][0]["AllocatedStorage"] == 10 conn.modify_db_instance( DBInstanceIdentifier="db-master-1", AllocatedStorage=20, @@ -571,13 +569,14 @@ def test_modify_db_instance(): VpcSecurityGroupIds=["sg-123456"], ) instances = conn.describe_db_instances(DBInstanceIdentifier="db-master-1") - instances["DBInstances"][0]["AllocatedStorage"].should.equal(20) - instances["DBInstances"][0]["PreferredMaintenanceWindow"].should.equal( + assert instances["DBInstances"][0]["AllocatedStorage"] == 20 + assert instances["DBInstances"][0]["PreferredMaintenanceWindow"] == ( "wed:06:38-wed:07:08" ) - instances["DBInstances"][0]["VpcSecurityGroups"][0][ - "VpcSecurityGroupId" - ].should.equal("sg-123456") + assert ( + instances["DBInstances"][0]["VpcSecurityGroups"][0]["VpcSecurityGroupId"] + == "sg-123456" + ) @mock_rds @@ -594,11 +593,12 @@ def test_modify_db_instance_not_existent_db_parameter_group_name(): DBSecurityGroups=["my_sg"], ) instances = conn.describe_db_instances(DBInstanceIdentifier="db-master-1") - instances["DBInstances"][0]["AllocatedStorage"].should.equal(10) - conn.modify_db_instance.when.called_with( - DBInstanceIdentifier="db-master-1", - DBParameterGroupName="test-sqlserver-se-2017", - ).should.throw(ClientError) + assert instances["DBInstances"][0]["AllocatedStorage"] == 10 + with pytest.raises(ClientError): + conn.modify_db_instance( + DBInstanceIdentifier="db-master-1", + DBParameterGroupName="test-sqlserver-se-2017", + ) @mock_rds @@ -620,7 +620,7 @@ def test_modify_db_instance_valid_preferred_maintenance_window(): PreferredMaintenanceWindow="sun:16:00-sun:16:30", ) instances = conn.describe_db_instances(DBInstanceIdentifier="db-master-1") - instances["DBInstances"][0]["PreferredMaintenanceWindow"].should.equal( + assert instances["DBInstances"][0]["PreferredMaintenanceWindow"] == ( "sun:16:00-sun:16:30" ) @@ -644,7 +644,7 @@ def test_modify_db_instance_valid_preferred_maintenance_window_uppercase(): PreferredMaintenanceWindow="SUN:16:00-SUN:16:30", ) instances = conn.describe_db_instances(DBInstanceIdentifier="db-master-1") - instances["DBInstances"][0]["PreferredMaintenanceWindow"].should.equal( + assert instances["DBInstances"][0]["PreferredMaintenanceWindow"] == ( "sun:16:00-sun:16:30" ) @@ -668,8 +668,8 @@ def test_modify_db_instance_invalid_preferred_maintenance_window_more_than_24_ho PreferredMaintenanceWindow="sun:16:00-sat:16:30", ) err = ex.value.response["Error"] - err["Code"].should.equal("InvalidParameterValue") - err["Message"].should.equal("Maintenance window must be less than 24 hours.") + assert err["Code"] == "InvalidParameterValue" + assert err["Message"] == "Maintenance window must be less than 24 hours." @mock_rds @@ -691,8 +691,8 @@ def test_modify_db_instance_invalid_preferred_maintenance_window_less_than_30_mi PreferredMaintenanceWindow="sun:16:00-sun:16:10", ) err = ex.value.response["Error"] - err["Code"].should.equal("InvalidParameterValue") - err["Message"].should.equal("The maintenance window must be at least 30 minutes.") + assert err["Code"] == "InvalidParameterValue" + assert err["Message"] == "The maintenance window must be at least 30 minutes." @mock_rds @@ -714,8 +714,8 @@ def test_modify_db_instance_invalid_preferred_maintenance_window_value(): PreferredMaintenanceWindow="sin:16:00-sun:16:30", ) err = ex.value.response["Error"] - err["Code"].should.equal("InvalidParameterValue") - err["Message"].should.contain("Invalid day:hour:minute value") + assert err["Code"] == "InvalidParameterValue" + assert "Invalid day:hour:minute value" in err["Message"] @mock_rds @@ -737,10 +737,11 @@ def test_modify_db_instance_invalid_preferred_maintenance_window_format(): PreferredMaintenanceWindow="sun:16:00sun:16:30", ) err = ex.value.response["Error"] - err["Code"].should.equal("InvalidParameterValue") - err["Message"].should.contain( - "Should be specified as a range ddd:hh24:mi-ddd:hh24:mi (24H Clock UTC). Example: Sun:23:45-Mon:00:15" - ) + assert err["Code"] == "InvalidParameterValue" + assert ( + "Should be specified as a range ddd:hh24:mi-ddd:hh24:mi " + "(24H Clock UTC). Example: Sun:23:45-Mon:00:15" + ) in err["Message"] @mock_rds @@ -763,9 +764,9 @@ def test_modify_db_instance_maintenance_backup_window_no_spill(): PreferredBackupWindow="15:50-16:20", ) err = ex.value.response["Error"] - err["Code"].should.equal("InvalidParameterValue") - err["Message"].should.equal( - "The backup window and maintenance window must not overlap." + assert err["Code"] == "InvalidParameterValue" + assert ( + err["Message"] == "The backup window and maintenance window must not overlap." ) @@ -789,8 +790,8 @@ def test_modify_db_instance_maintenance_backup_window_maintenance_spill(): PreferredBackupWindow="00:00-00:30", ) err = ex.value.response["Error"] - err["Code"].should.equal("InvalidParameterValue") - err["Message"].should.equal( + assert err["Code"] == "InvalidParameterValue" + assert err["Message"] == ( "The backup window and maintenance window must not overlap." ) @@ -815,8 +816,8 @@ def test_modify_db_instance_maintenance_backup_window_backup_spill(): PreferredBackupWindow="23:50-00:20", ) err = ex.value.response["Error"] - err["Code"].should.equal("InvalidParameterValue") - err["Message"].should.equal( + assert err["Code"] == "InvalidParameterValue" + assert err["Message"] == ( "The backup window and maintenance window must not overlap." ) @@ -841,8 +842,8 @@ def test_modify_db_instance_maintenance_backup_window_both_spill(): PreferredBackupWindow="23:20-00:20", ) err = ex.value.response["Error"] - err["Code"].should.equal("InvalidParameterValue") - err["Message"].should.equal( + assert err["Code"] == "InvalidParameterValue" + assert err["Message"] == ( "The backup window and maintenance window must not overlap." ) @@ -861,28 +862,27 @@ def test_rename_db_instance(): DBSecurityGroups=["my_sg"], ) instances = conn.describe_db_instances(DBInstanceIdentifier="db-master-1") - list(instances["DBInstances"]).should.have.length_of(1) - conn.describe_db_instances.when.called_with( - DBInstanceIdentifier="db-master-2" - ).should.throw(ClientError) + assert len(list(instances["DBInstances"])) == 1 + with pytest.raises(ClientError): + conn.describe_db_instances(DBInstanceIdentifier="db-master-2") conn.modify_db_instance( DBInstanceIdentifier="db-master-1", NewDBInstanceIdentifier="db-master-2", ApplyImmediately=True, ) - conn.describe_db_instances.when.called_with( - DBInstanceIdentifier="db-master-1" - ).should.throw(ClientError) + with pytest.raises(ClientError): + conn.describe_db_instances(DBInstanceIdentifier="db-master-1") instances = conn.describe_db_instances(DBInstanceIdentifier="db-master-2") - list(instances["DBInstances"]).should.have.length_of(1) + assert len(list(instances["DBInstances"])) == 1 @mock_rds def test_modify_non_existent_database(): conn = boto3.client("rds", region_name=DEFAULT_REGION) - conn.modify_db_instance.when.called_with( - DBInstanceIdentifier="not-a-db", AllocatedStorage=20, ApplyImmediately=True - ).should.throw(ClientError) + with pytest.raises(ClientError): + conn.modify_db_instance( + DBInstanceIdentifier="not-a-db", AllocatedStorage=20, ApplyImmediately=True + ) @mock_rds @@ -899,22 +899,21 @@ def test_reboot_db_instance(): DBSecurityGroups=["my_sg"], ) database = conn.reboot_db_instance(DBInstanceIdentifier="db-master-1") - database["DBInstance"]["DBInstanceIdentifier"].should.equal("db-master-1") + assert database["DBInstance"]["DBInstanceIdentifier"] == "db-master-1" @mock_rds def test_reboot_non_existent_database(): conn = boto3.client("rds", region_name=DEFAULT_REGION) - conn.reboot_db_instance.when.called_with( - DBInstanceIdentifier="not-a-db" - ).should.throw(ClientError) + with pytest.raises(ClientError): + conn.reboot_db_instance(DBInstanceIdentifier="not-a-db") @mock_rds def test_delete_database(): conn = boto3.client("rds", region_name=DEFAULT_REGION) instances = conn.describe_db_instances() - list(instances["DBInstances"]).should.have.length_of(0) + assert len(list(instances["DBInstances"])) == 0 conn.create_db_instance( DBInstanceIdentifier="db-1", AllocatedStorage=10, @@ -926,7 +925,7 @@ def test_delete_database(): DBSecurityGroups=["my_sg"], ) instances = conn.describe_db_instances() - list(instances["DBInstances"]).should.have.length_of(1) + assert len(list(instances["DBInstances"])) == 1 conn.delete_db_instance( DBInstanceIdentifier="db-1", @@ -934,7 +933,7 @@ def test_delete_database(): ) instances = conn.describe_db_instances() - list(instances["DBInstances"]).should.have.length_of(0) + assert len(list(instances["DBInstances"])) == 0 # Saved the snapshot snapshot = conn.describe_db_snapshots(DBInstanceIdentifier="db-1")["DBSnapshots"][0] @@ -945,9 +944,10 @@ def test_delete_database(): @mock_rds def test_create_db_snapshots(): conn = boto3.client("rds", region_name=DEFAULT_REGION) - conn.create_db_snapshot.when.called_with( - DBInstanceIdentifier="db-primary-1", DBSnapshotIdentifier="snapshot-1" - ).should.throw(ClientError) + with pytest.raises(ClientError): + conn.create_db_snapshot( + DBInstanceIdentifier="db-primary-1", DBSnapshotIdentifier="snapshot-1" + ) conn.create_db_instance( DBInstanceIdentifier="db-primary-1", @@ -965,19 +965,20 @@ def test_create_db_snapshots(): DBInstanceIdentifier="db-primary-1", DBSnapshotIdentifier="g-1" ).get("DBSnapshot") - snapshot.get("Engine").should.equal("postgres") - snapshot.get("DBInstanceIdentifier").should.equal("db-primary-1") - snapshot.get("DBSnapshotIdentifier").should.equal("g-1") + assert snapshot.get("Engine") == "postgres" + assert snapshot.get("DBInstanceIdentifier") == "db-primary-1" + assert snapshot.get("DBSnapshotIdentifier") == "g-1" result = conn.list_tags_for_resource(ResourceName=snapshot["DBSnapshotArn"]) - result["TagList"].should.equal([]) + assert result["TagList"] == [] @mock_rds def test_create_db_snapshots_copy_tags(): conn = boto3.client("rds", region_name=DEFAULT_REGION) - conn.create_db_snapshot.when.called_with( - DBInstanceIdentifier="db-primary-1", DBSnapshotIdentifier="snapshot-1" - ).should.throw(ClientError) + with pytest.raises(ClientError): + conn.create_db_snapshot( + DBInstanceIdentifier="db-primary-1", DBSnapshotIdentifier="snapshot-1" + ) conn.create_db_instance( DBInstanceIdentifier="db-primary-1", @@ -997,13 +998,14 @@ def test_create_db_snapshots_copy_tags(): DBInstanceIdentifier="db-primary-1", DBSnapshotIdentifier="g-1" ).get("DBSnapshot") - snapshot.get("Engine").should.equal("postgres") - snapshot.get("DBInstanceIdentifier").should.equal("db-primary-1") - snapshot.get("DBSnapshotIdentifier").should.equal("g-1") + assert snapshot.get("Engine") == "postgres" + assert snapshot.get("DBInstanceIdentifier") == "db-primary-1" + assert snapshot.get("DBSnapshotIdentifier") == "g-1" result = conn.list_tags_for_resource(ResourceName=snapshot["DBSnapshotArn"]) - result["TagList"].should.equal( - [{"Value": "bar", "Key": "foo"}, {"Value": "bar1", "Key": "foo1"}] - ) + assert result["TagList"] == [ + {"Value": "bar", "Key": "foo"}, + {"Value": "bar1", "Key": "foo1"}, + ] @mock_rds @@ -1030,10 +1032,11 @@ def test_create_db_snapshots_with_tags(): snapshots = conn.describe_db_snapshots(DBInstanceIdentifier="db-primary-1").get( "DBSnapshots" ) - snapshots[0].get("DBSnapshotIdentifier").should.equal("g-1") - snapshots[0].get("TagList").should.equal( - [{"Value": "bar", "Key": "foo"}, {"Value": "bar1", "Key": "foo1"}] - ) + assert snapshots[0].get("DBSnapshotIdentifier") == "g-1" + assert snapshots[0].get("TagList") == [ + {"Value": "bar", "Key": "foo"}, + {"Value": "bar1", "Key": "foo1"}, + ] @mock_rds @@ -1060,11 +1063,11 @@ def test_copy_db_snapshots(): SourceDBSnapshotIdentifier="snapshot-1", TargetDBSnapshotIdentifier="snapshot-2" ).get("DBSnapshot") - target_snapshot.get("Engine").should.equal("postgres") - target_snapshot.get("DBInstanceIdentifier").should.equal("db-primary-1") - target_snapshot.get("DBSnapshotIdentifier").should.equal("snapshot-2") + assert target_snapshot.get("Engine") == "postgres" + assert target_snapshot.get("DBInstanceIdentifier") == "db-primary-1" + assert target_snapshot.get("DBSnapshotIdentifier") == "snapshot-2" result = conn.list_tags_for_resource(ResourceName=target_snapshot["DBSnapshotArn"]) - result["TagList"].should.equal([]) + assert result["TagList"] == [] @mock_rds @@ -1095,7 +1098,7 @@ def test_describe_db_snapshots(): by_snapshot_id = conn.describe_db_snapshots(DBSnapshotIdentifier="snapshot-1").get( "DBSnapshots" ) - by_snapshot_id.should.equal(by_database_id) + assert by_snapshot_id == by_database_id snapshot = by_snapshot_id[0] assert snapshot == created @@ -1106,7 +1109,7 @@ def test_describe_db_snapshots(): snapshots = conn.describe_db_snapshots(DBInstanceIdentifier="db-primary-1").get( "DBSnapshots" ) - snapshots.should.have.length_of(2) + assert len(snapshots) == 2 @mock_rds @@ -1155,11 +1158,12 @@ def test_delete_db_snapshot(): DBInstanceIdentifier="db-primary-1", DBSnapshotIdentifier="snapshot-1" ) - conn.describe_db_snapshots(DBSnapshotIdentifier="snapshot-1").get("DBSnapshots")[0] + _ = conn.describe_db_snapshots(DBSnapshotIdentifier="snapshot-1").get( + "DBSnapshots" + )[0] conn.delete_db_snapshot(DBSnapshotIdentifier="snapshot-1") - conn.describe_db_snapshots.when.called_with( - DBSnapshotIdentifier="snapshot-1" - ).should.throw(ClientError) + with pytest.raises(ClientError): + conn.describe_db_snapshots(DBSnapshotIdentifier="snapshot-1") @mock_rds @@ -1175,7 +1179,7 @@ def test_restore_db_instance_from_db_snapshot(): MasterUserPassword="hunter2", DBSecurityGroups=["my_sg"], ) - conn.describe_db_instances()["DBInstances"].should.have.length_of(1) + assert len(conn.describe_db_instances()["DBInstances"]) == 1 conn.create_db_snapshot( DBInstanceIdentifier="db-primary-1", DBSnapshotIdentifier="snapshot-1" @@ -1185,24 +1189,29 @@ def test_restore_db_instance_from_db_snapshot(): new_instance = conn.restore_db_instance_from_db_snapshot( DBInstanceIdentifier="db-restore-1", DBSnapshotIdentifier="snapshot-1" )["DBInstance"] - new_instance["DBInstanceIdentifier"].should.equal("db-restore-1") - new_instance["DBInstanceClass"].should.equal("db.m1.small") - new_instance["StorageType"].should.equal("gp2") - new_instance["Engine"].should.equal("postgres") - new_instance["DBName"].should.equal("staging-postgres") - new_instance["DBParameterGroups"][0]["DBParameterGroupName"].should.equal( + assert new_instance["DBInstanceIdentifier"] == "db-restore-1" + assert new_instance["DBInstanceClass"] == "db.m1.small" + assert new_instance["StorageType"] == "gp2" + assert new_instance["Engine"] == "postgres" + assert new_instance["DBName"] == "staging-postgres" + assert new_instance["DBParameterGroups"][0]["DBParameterGroupName"] == ( "default.postgres9.3" ) - new_instance["DBSecurityGroups"].should.equal( - [{"DBSecurityGroupName": "my_sg", "Status": "active"}] - ) - new_instance["Endpoint"]["Port"].should.equal(5432) + assert new_instance["DBSecurityGroups"] == [ + {"DBSecurityGroupName": "my_sg", "Status": "active"} + ] + assert new_instance["Endpoint"]["Port"] == 5432 # Verify it exists - conn.describe_db_instances()["DBInstances"].should.have.length_of(2) - conn.describe_db_instances(DBInstanceIdentifier="db-restore-1")[ - "DBInstances" - ].should.have.length_of(1) + assert len(conn.describe_db_instances()["DBInstances"]) == 2 + assert ( + len( + conn.describe_db_instances(DBInstanceIdentifier="db-restore-1")[ + "DBInstances" + ] + ) + == 1 + ) @mock_rds @@ -1219,7 +1228,7 @@ def test_restore_db_instance_from_db_snapshot_and_override_params(): Port=1234, DBSecurityGroups=["my_sg"], ) - conn.describe_db_instances()["DBInstances"].should.have.length_of(1) + assert len(conn.describe_db_instances()["DBInstances"]) == 1 conn.create_db_snapshot( DBInstanceIdentifier="db-primary-1", DBSnapshotIdentifier="snapshot-1" ) @@ -1231,17 +1240,17 @@ def test_restore_db_instance_from_db_snapshot_and_override_params(): Port=10000, VpcSecurityGroupIds=["new_vpc"], )["DBInstance"] - new_instance["DBInstanceIdentifier"].should.equal("db-restore-1") - new_instance["DBParameterGroups"][0]["DBParameterGroupName"].should.equal( + assert new_instance["DBInstanceIdentifier"] == "db-restore-1" + assert new_instance["DBParameterGroups"][0]["DBParameterGroupName"] == ( "default.postgres9.3" ) - new_instance["DBSecurityGroups"].should.equal( - [{"DBSecurityGroupName": "my_sg", "Status": "active"}] - ) - new_instance["VpcSecurityGroups"].should.equal( - [{"VpcSecurityGroupId": "new_vpc", "Status": "active"}] - ) - new_instance["Endpoint"]["Port"].should.equal(10000) + assert new_instance["DBSecurityGroups"] == [ + {"DBSecurityGroupName": "my_sg", "Status": "active"} + ] + assert new_instance["VpcSecurityGroups"] == [ + {"VpcSecurityGroupId": "new_vpc", "Status": "active"} + ] + assert new_instance["Endpoint"]["Port"] == 10000 @mock_rds @@ -1253,13 +1262,13 @@ def test_create_option_group(): MajorEngineVersion="5.6", OptionGroupDescription="test option group", ) - option_group["OptionGroup"]["OptionGroupName"].should.equal("test") - option_group["OptionGroup"]["EngineName"].should.equal("mysql") - option_group["OptionGroup"]["OptionGroupDescription"].should.equal( + assert option_group["OptionGroup"]["OptionGroupName"] == "test" + assert option_group["OptionGroup"]["EngineName"] == "mysql" + assert option_group["OptionGroup"]["OptionGroupDescription"] == ( "test option group" ) - option_group["OptionGroup"]["MajorEngineVersion"].should.equal("5.6") - option_group["OptionGroup"]["OptionGroupArn"].should.equal( + assert option_group["OptionGroup"]["MajorEngineVersion"] == "5.6" + assert option_group["OptionGroup"]["OptionGroupArn"] == ( f"arn:aws:rds:us-west-2:{ACCOUNT_ID}:og:test" ) @@ -1267,34 +1276,37 @@ def test_create_option_group(): @mock_rds def test_create_option_group_bad_engine_name(): conn = boto3.client("rds", region_name=DEFAULT_REGION) - conn.create_option_group.when.called_with( - OptionGroupName="test", - EngineName="invalid_engine", - MajorEngineVersion="5.6", - OptionGroupDescription="test invalid engine", - ).should.throw(ClientError) + with pytest.raises(ClientError): + conn.create_option_group( + OptionGroupName="test", + EngineName="invalid_engine", + MajorEngineVersion="5.6", + OptionGroupDescription="test invalid engine", + ) @mock_rds def test_create_option_group_bad_engine_major_version(): conn = boto3.client("rds", region_name=DEFAULT_REGION) - conn.create_option_group.when.called_with( - OptionGroupName="test", - EngineName="mysql", - MajorEngineVersion="6.6.6", - OptionGroupDescription="test invalid engine version", - ).should.throw(ClientError) + with pytest.raises(ClientError): + conn.create_option_group( + OptionGroupName="test", + EngineName="mysql", + MajorEngineVersion="6.6.6", + OptionGroupDescription="test invalid engine version", + ) @mock_rds def test_create_option_group_empty_description(): conn = boto3.client("rds", region_name=DEFAULT_REGION) - conn.create_option_group.when.called_with( - OptionGroupName="test", - EngineName="mysql", - MajorEngineVersion="5.6", - OptionGroupDescription="", - ).should.throw(ClientError) + with pytest.raises(ClientError): + conn.create_option_group( + OptionGroupName="test", + EngineName="mysql", + MajorEngineVersion="5.6", + OptionGroupDescription="", + ) @mock_rds @@ -1306,12 +1318,13 @@ def test_create_option_group_duplicate(): MajorEngineVersion="5.6", OptionGroupDescription="test option group", ) - conn.create_option_group.when.called_with( - OptionGroupName="test", - EngineName="mysql", - MajorEngineVersion="5.6", - OptionGroupDescription="test option group", - ).should.throw(ClientError) + with pytest.raises(ClientError): + conn.create_option_group( + OptionGroupName="test", + EngineName="mysql", + MajorEngineVersion="5.6", + OptionGroupDescription="test option group", + ) @mock_rds @@ -1324,15 +1337,14 @@ def test_describe_option_group(): OptionGroupDescription="test option group", ) option_groups = conn.describe_option_groups(OptionGroupName="test") - option_groups["OptionGroupsList"][0]["OptionGroupName"].should.equal("test") + assert option_groups["OptionGroupsList"][0]["OptionGroupName"] == "test" @mock_rds def test_describe_non_existent_option_group(): conn = boto3.client("rds", region_name=DEFAULT_REGION) - conn.describe_option_groups.when.called_with( - OptionGroupName="not-a-option-group" - ).should.throw(ClientError) + with pytest.raises(ClientError): + conn.describe_option_groups(OptionGroupName="not-a-option-group") @mock_rds @@ -1345,40 +1357,38 @@ def test_delete_option_group(): OptionGroupDescription="test option group", ) option_groups = conn.describe_option_groups(OptionGroupName="test") - option_groups["OptionGroupsList"][0]["OptionGroupName"].should.equal("test") + assert option_groups["OptionGroupsList"][0]["OptionGroupName"] == "test" conn.delete_option_group(OptionGroupName="test") - conn.describe_option_groups.when.called_with(OptionGroupName="test").should.throw( - ClientError - ) + with pytest.raises(ClientError): + conn.describe_option_groups(OptionGroupName="test") @mock_rds def test_delete_non_existent_option_group(): conn = boto3.client("rds", region_name=DEFAULT_REGION) - conn.delete_option_group.when.called_with( - OptionGroupName="non-existent" - ).should.throw(ClientError) + with pytest.raises(ClientError): + conn.delete_option_group(OptionGroupName="non-existent") @mock_rds def test_describe_option_group_options(): conn = boto3.client("rds", region_name=DEFAULT_REGION) option_group_options = conn.describe_option_group_options(EngineName="sqlserver-ee") - len(option_group_options["OptionGroupOptions"]).should.equal(4) + assert len(option_group_options["OptionGroupOptions"]) == 4 option_group_options = conn.describe_option_group_options( EngineName="sqlserver-ee", MajorEngineVersion="11.00" ) - len(option_group_options["OptionGroupOptions"]).should.equal(2) + assert len(option_group_options["OptionGroupOptions"]) == 2 option_group_options = conn.describe_option_group_options( EngineName="mysql", MajorEngineVersion="5.6" ) - len(option_group_options["OptionGroupOptions"]).should.equal(1) - conn.describe_option_group_options.when.called_with( - EngineName="non-existent" - ).should.throw(ClientError) - conn.describe_option_group_options.when.called_with( - EngineName="mysql", MajorEngineVersion="non-existent" - ).should.throw(ClientError) + assert len(option_group_options["OptionGroupOptions"]) == 1 + with pytest.raises(ClientError): + conn.describe_option_group_options(EngineName="non-existent") + with pytest.raises(ClientError): + conn.describe_option_group_options( + EngineName="mysql", MajorEngineVersion="non-existent" + ) @mock_rds @@ -1399,9 +1409,9 @@ def test_modify_option_group(): OptionsToRemove=["MEMCACHED"], ApplyImmediately=True, ) - result["OptionGroup"]["EngineName"].should.equal("mysql") - result["OptionGroup"]["Options"].should.equal([]) - result["OptionGroup"]["OptionGroupName"].should.equal("test") + assert result["OptionGroup"]["EngineName"] == "mysql" + assert result["OptionGroup"]["Options"] == [] + assert result["OptionGroup"]["OptionGroupName"] == "test" @mock_rds @@ -1413,17 +1423,21 @@ def test_modify_option_group_no_options(): MajorEngineVersion="5.6", OptionGroupDescription="test option group", ) - conn.modify_option_group.when.called_with(OptionGroupName="test").should.throw( - ClientError - ) + with pytest.raises(ClientError): + conn.modify_option_group(OptionGroupName="test") @mock_rds def test_modify_non_existent_option_group(): conn = boto3.client("rds", region_name=DEFAULT_REGION) - conn.modify_option_group.when.called_with( - OptionGroupName="non-existent", OptionsToInclude=[{"OptionName": "test-option"}] - ).should.throw(ClientError, "Specified OptionGroupName: non-existent not found.") + with pytest.raises(ClientError) as client_err: + conn.modify_option_group( + OptionGroupName="non-existent", + OptionsToInclude=[{"OptionName": "test-option"}], + ) + assert client_err.value.response["Error"]["Message"] == ( + "Specified OptionGroupName: non-existent not found." + ) @mock_rds @@ -1440,7 +1454,7 @@ def test_delete_database_with_protection(): with pytest.raises(ClientError) as exc: conn.delete_db_instance(DBInstanceIdentifier="db-primary-1") err = exc.value.response["Error"] - err["Message"].should.equal("Can't delete Instance with protection enabled") + assert err["Message"] == "Can't delete Instance with protection enabled" @mock_rds @@ -1448,18 +1462,15 @@ def test_delete_non_existent_database(): conn = boto3.client("rds", region_name=DEFAULT_REGION) with pytest.raises(ClientError) as ex: conn.delete_db_instance(DBInstanceIdentifier="non-existent") - ex.value.response["Error"]["Code"].should.equal("DBInstanceNotFound") - ex.value.response["Error"]["Message"].should.equal( - "DBInstance non-existent not found." - ) + assert ex.value.response["Error"]["Code"] == "DBInstanceNotFound" + assert ex.value.response["Error"]["Message"] == "DBInstance non-existent not found." @mock_rds def test_list_tags_invalid_arn(): conn = boto3.client("rds", region_name=DEFAULT_REGION) - conn.list_tags_for_resource.when.called_with( - ResourceName="arn:aws:rds:bad-arn" - ).should.throw(ClientError) + with pytest.raises(ClientError): + conn.list_tags_for_resource(ResourceName="arn:aws:rds:bad-arn") @mock_rds @@ -1468,7 +1479,7 @@ def test_list_tags_db(): result = conn.list_tags_for_resource( ResourceName="arn:aws:rds:us-west-2:1234567890:db:foo" ) - result["TagList"].should.equal([]) + assert result["TagList"] == [] test_instance = conn.create_db_instance( DBInstanceIdentifier="db-with-tags", AllocatedStorage=10, @@ -1483,9 +1494,10 @@ def test_list_tags_db(): result = conn.list_tags_for_resource( ResourceName=test_instance["DBInstance"]["DBInstanceArn"] ) - result["TagList"].should.equal( - [{"Value": "bar", "Key": "foo"}, {"Value": "bar1", "Key": "foo1"}] - ) + assert result["TagList"] == [ + {"Value": "bar", "Key": "foo"}, + {"Value": "bar1", "Key": "foo1"}, + ] @mock_rds @@ -1505,7 +1517,7 @@ def test_add_tags_db(): result = conn.list_tags_for_resource( ResourceName="arn:aws:rds:us-west-2:1234567890:db:db-without-tags" ) - list(result["TagList"]).should.have.length_of(2) + assert len(list(result["TagList"])) == 2 conn.add_tags_to_resource( ResourceName="arn:aws:rds:us-west-2:1234567890:db:db-without-tags", Tags=[{"Key": "foo", "Value": "fish"}, {"Key": "foo2", "Value": "bar2"}], @@ -1513,7 +1525,7 @@ def test_add_tags_db(): result = conn.list_tags_for_resource( ResourceName="arn:aws:rds:us-west-2:1234567890:db:db-without-tags" ) - list(result["TagList"]).should.have.length_of(3) + assert len(list(result["TagList"])) == 3 @mock_rds @@ -1533,14 +1545,14 @@ def test_remove_tags_db(): result = conn.list_tags_for_resource( ResourceName="arn:aws:rds:us-west-2:1234567890:db:db-with-tags" ) - list(result["TagList"]).should.have.length_of(2) + assert len(list(result["TagList"])) == 2 conn.remove_tags_from_resource( ResourceName="arn:aws:rds:us-west-2:1234567890:db:db-with-tags", TagKeys=["foo"] ) result = conn.list_tags_for_resource( ResourceName="arn:aws:rds:us-west-2:1234567890:db:db-with-tags" ) - len(result["TagList"]).should.equal(1) + assert len(result["TagList"]) == 1 @mock_rds @@ -1549,7 +1561,7 @@ def test_list_tags_snapshot(): result = conn.list_tags_for_resource( ResourceName="arn:aws:rds:us-west-2:1234567890:snapshot:foo" ) - result["TagList"].should.equal([]) + assert result["TagList"] == [] conn.create_db_instance( DBInstanceIdentifier="db-primary-1", AllocatedStorage=10, @@ -1569,9 +1581,10 @@ def test_list_tags_snapshot(): result = conn.list_tags_for_resource( ResourceName=snapshot["DBSnapshot"]["DBSnapshotArn"] ) - result["TagList"].should.equal( - [{"Value": "bar", "Key": "foo"}, {"Value": "bar1", "Key": "foo1"}] - ) + assert result["TagList"] == [ + {"Value": "bar", "Key": "foo"}, + {"Value": "bar1", "Key": "foo1"}, + ] @mock_rds @@ -1596,7 +1609,7 @@ def test_add_tags_snapshot(): result = conn.list_tags_for_resource( ResourceName="arn:aws:rds:us-west-2:1234567890:snapshot:snapshot-without-tags" ) - list(result["TagList"]).should.have.length_of(2) + assert len(list(result["TagList"])) == 2 conn.add_tags_to_resource( ResourceName="arn:aws:rds:us-west-2:1234567890:snapshot:snapshot-without-tags", Tags=[{"Key": "foo", "Value": "fish"}, {"Key": "foo2", "Value": "bar2"}], @@ -1604,7 +1617,7 @@ def test_add_tags_snapshot(): result = conn.list_tags_for_resource( ResourceName="arn:aws:rds:us-west-2:1234567890:snapshot:snapshot-without-tags" ) - list(result["TagList"]).should.have.length_of(3) + assert len(list(result["TagList"])) == 3 @mock_rds @@ -1629,7 +1642,7 @@ def test_remove_tags_snapshot(): result = conn.list_tags_for_resource( ResourceName="arn:aws:rds:us-west-2:1234567890:snapshot:snapshot-with-tags" ) - list(result["TagList"]).should.have.length_of(2) + assert len(list(result["TagList"])) == 2 conn.remove_tags_from_resource( ResourceName="arn:aws:rds:us-west-2:1234567890:snapshot:snapshot-with-tags", TagKeys=["foo"], @@ -1637,7 +1650,7 @@ def test_remove_tags_snapshot(): result = conn.list_tags_for_resource( ResourceName="arn:aws:rds:us-west-2:1234567890:snapshot:snapshot-with-tags" ) - len(result["TagList"]).should.equal(1) + assert len(result["TagList"]) == 1 @mock_rds @@ -1652,7 +1665,7 @@ def test_add_tags_option_group(): result = conn.list_tags_for_resource( ResourceName="arn:aws:rds:us-west-2:1234567890:og:test" ) - list(result["TagList"]).should.have.length_of(0) + assert len(list(result["TagList"])) == 0 conn.add_tags_to_resource( ResourceName="arn:aws:rds:us-west-2:1234567890:og:test", Tags=[{"Key": "foo", "Value": "fish"}, {"Key": "foo2", "Value": "bar2"}], @@ -1660,7 +1673,7 @@ def test_add_tags_option_group(): result = conn.list_tags_for_resource( ResourceName="arn:aws:rds:us-west-2:1234567890:og:test" ) - list(result["TagList"]).should.have.length_of(2) + assert len(list(result["TagList"])) == 2 @mock_rds @@ -1682,14 +1695,14 @@ def test_remove_tags_option_group(): result = conn.list_tags_for_resource( ResourceName="arn:aws:rds:us-west-2:1234567890:og:test" ) - list(result["TagList"]).should.have.length_of(2) + assert len(list(result["TagList"])) == 2 conn.remove_tags_from_resource( ResourceName="arn:aws:rds:us-west-2:1234567890:og:test", TagKeys=["foo"] ) result = conn.list_tags_for_resource( ResourceName="arn:aws:rds:us-west-2:1234567890:og:test" ) - list(result["TagList"]).should.have.length_of(1) + assert len(list(result["TagList"])) == 1 @mock_rds @@ -1699,11 +1712,11 @@ def test_create_database_security_group(): result = conn.create_db_security_group( DBSecurityGroupName="db_sg", DBSecurityGroupDescription="DB Security Group" ) - result["DBSecurityGroup"]["DBSecurityGroupName"].should.equal("db_sg") - result["DBSecurityGroup"]["DBSecurityGroupDescription"].should.equal( - "DB Security Group" + assert result["DBSecurityGroup"]["DBSecurityGroupName"] == "db_sg" + assert ( + result["DBSecurityGroup"]["DBSecurityGroupDescription"] == "DB Security Group" ) - result["DBSecurityGroup"]["IPRanges"].should.equal([]) + assert result["DBSecurityGroup"]["IPRanges"] == [] @mock_rds @@ -1711,7 +1724,7 @@ def test_get_security_groups(): conn = boto3.client("rds", region_name=DEFAULT_REGION) result = conn.describe_db_security_groups() - result["DBSecurityGroups"].should.have.length_of(0) + assert len(result["DBSecurityGroups"]) == 0 conn.create_db_security_group( DBSecurityGroupName="db_sg1", DBSecurityGroupDescription="DB Security Group" @@ -1721,19 +1734,18 @@ def test_get_security_groups(): ) result = conn.describe_db_security_groups() - result["DBSecurityGroups"].should.have.length_of(2) + assert len(result["DBSecurityGroups"]) == 2 result = conn.describe_db_security_groups(DBSecurityGroupName="db_sg1") - result["DBSecurityGroups"].should.have.length_of(1) - result["DBSecurityGroups"][0]["DBSecurityGroupName"].should.equal("db_sg1") + assert len(result["DBSecurityGroups"]) == 1 + assert result["DBSecurityGroups"][0]["DBSecurityGroupName"] == "db_sg1" @mock_rds def test_get_non_existent_security_group(): conn = boto3.client("rds", region_name=DEFAULT_REGION) - conn.describe_db_security_groups.when.called_with( - DBSecurityGroupName="not-a-sg" - ).should.throw(ClientError) + with pytest.raises(ClientError): + conn.describe_db_security_groups(DBSecurityGroupName="not-a-sg") @mock_rds @@ -1744,19 +1756,18 @@ def test_delete_database_security_group(): ) result = conn.describe_db_security_groups() - result["DBSecurityGroups"].should.have.length_of(1) + assert len(result["DBSecurityGroups"]) == 1 conn.delete_db_security_group(DBSecurityGroupName="db_sg") result = conn.describe_db_security_groups() - result["DBSecurityGroups"].should.have.length_of(0) + assert len(result["DBSecurityGroups"]) == 0 @mock_rds def test_delete_non_existent_security_group(): conn = boto3.client("rds", region_name=DEFAULT_REGION) - conn.delete_db_security_group.when.called_with( - DBSecurityGroupName="not-a-db" - ).should.throw(ClientError) + with pytest.raises(ClientError): + conn.delete_db_security_group(DBSecurityGroupName="not-a-db") @mock_rds @@ -1765,29 +1776,27 @@ def test_security_group_authorize(): security_group = conn.create_db_security_group( DBSecurityGroupName="db_sg", DBSecurityGroupDescription="DB Security Group" ) - security_group["DBSecurityGroup"]["IPRanges"].should.equal([]) + assert security_group["DBSecurityGroup"]["IPRanges"] == [] conn.authorize_db_security_group_ingress( DBSecurityGroupName="db_sg", CIDRIP="10.3.2.45/32" ) result = conn.describe_db_security_groups(DBSecurityGroupName="db_sg") - result["DBSecurityGroups"][0]["IPRanges"].should.have.length_of(1) - result["DBSecurityGroups"][0]["IPRanges"].should.equal( - [{"Status": "authorized", "CIDRIP": "10.3.2.45/32"}] - ) + assert len(result["DBSecurityGroups"][0]["IPRanges"]) == 1 + assert result["DBSecurityGroups"][0]["IPRanges"] == [ + {"Status": "authorized", "CIDRIP": "10.3.2.45/32"} + ] conn.authorize_db_security_group_ingress( DBSecurityGroupName="db_sg", CIDRIP="10.3.2.46/32" ) result = conn.describe_db_security_groups(DBSecurityGroupName="db_sg") - result["DBSecurityGroups"][0]["IPRanges"].should.have.length_of(2) - result["DBSecurityGroups"][0]["IPRanges"].should.equal( - [ - {"Status": "authorized", "CIDRIP": "10.3.2.45/32"}, - {"Status": "authorized", "CIDRIP": "10.3.2.46/32"}, - ] - ) + assert len(result["DBSecurityGroups"][0]["IPRanges"]) == 2 + assert result["DBSecurityGroups"][0]["IPRanges"] == [ + {"Status": "authorized", "CIDRIP": "10.3.2.45/32"}, + {"Status": "authorized", "CIDRIP": "10.3.2.46/32"}, + ] @mock_rds @@ -1805,7 +1814,7 @@ def test_add_security_group_to_database(): ) result = conn.describe_db_instances() - result["DBInstances"][0]["DBSecurityGroups"].should.equal([]) + assert result["DBInstances"][0]["DBSecurityGroups"] == [] conn.create_db_security_group( DBSecurityGroupName="db_sg", DBSecurityGroupDescription="DB Security Group" ) @@ -1813,8 +1822,9 @@ def test_add_security_group_to_database(): DBInstanceIdentifier="db-master-1", DBSecurityGroups=["db_sg"] ) result = conn.describe_db_instances() - result["DBInstances"][0]["DBSecurityGroups"][0]["DBSecurityGroupName"].should.equal( - "db_sg" + assert ( + result["DBInstances"][0]["DBSecurityGroups"][0]["DBSecurityGroupName"] + == "db_sg" ) @@ -1822,7 +1832,7 @@ def test_add_security_group_to_database(): def test_list_tags_security_group(): conn = boto3.client("rds", region_name=DEFAULT_REGION) result = conn.describe_db_subnet_groups() - result["DBSubnetGroups"].should.have.length_of(0) + assert len(result["DBSubnetGroups"]) == 0 security_group = conn.create_db_security_group( DBSecurityGroupName="db_sg", @@ -1831,16 +1841,17 @@ def test_list_tags_security_group(): )["DBSecurityGroup"]["DBSecurityGroupName"] resource = f"arn:aws:rds:us-west-2:1234567890:secgrp:{security_group}" result = conn.list_tags_for_resource(ResourceName=resource) - result["TagList"].should.equal( - [{"Value": "bar", "Key": "foo"}, {"Value": "bar1", "Key": "foo1"}] - ) + assert result["TagList"] == [ + {"Value": "bar", "Key": "foo"}, + {"Value": "bar1", "Key": "foo1"}, + ] @mock_rds def test_add_tags_security_group(): conn = boto3.client("rds", region_name=DEFAULT_REGION) result = conn.describe_db_subnet_groups() - result["DBSubnetGroups"].should.have.length_of(0) + assert len(result["DBSubnetGroups"]) == 0 security_group = conn.create_db_security_group( DBSecurityGroupName="db_sg", DBSecurityGroupDescription="DB Security Group" @@ -1853,16 +1864,17 @@ def test_add_tags_security_group(): ) result = conn.list_tags_for_resource(ResourceName=resource) - result["TagList"].should.equal( - [{"Value": "bar", "Key": "foo"}, {"Value": "bar1", "Key": "foo1"}] - ) + assert result["TagList"] == [ + {"Value": "bar", "Key": "foo"}, + {"Value": "bar1", "Key": "foo1"}, + ] @mock_rds def test_remove_tags_security_group(): conn = boto3.client("rds", region_name=DEFAULT_REGION) result = conn.describe_db_subnet_groups() - result["DBSubnetGroups"].should.have.length_of(0) + assert len(result["DBSubnetGroups"]) == 0 security_group = conn.create_db_security_group( DBSecurityGroupName="db_sg", @@ -1874,7 +1886,7 @@ def test_remove_tags_security_group(): conn.remove_tags_from_resource(ResourceName=resource, TagKeys=["foo"]) result = conn.list_tags_for_resource(ResourceName=resource) - result["TagList"].should.equal([{"Value": "bar1", "Key": "foo1"}]) + assert result["TagList"] == [{"Value": "bar1", "Key": "foo1"}] @mock_ec2 @@ -1896,11 +1908,11 @@ def test_create_database_subnet_group(): DBSubnetGroupDescription="my db subnet", SubnetIds=subnet_ids, ) - result["DBSubnetGroup"]["DBSubnetGroupName"].should.equal("db_subnet") - result["DBSubnetGroup"]["DBSubnetGroupDescription"].should.equal("my db subnet") + assert result["DBSubnetGroup"]["DBSubnetGroupName"] == "db_subnet" + assert result["DBSubnetGroup"]["DBSubnetGroupDescription"] == "my db subnet" subnets = result["DBSubnetGroup"]["Subnets"] subnet_group_ids = [subnets[0]["SubnetIdentifier"], subnets[1]["SubnetIdentifier"]] - list(subnet_group_ids).should.equal(subnet_ids) + assert list(subnet_group_ids) == subnet_ids @mock_ec2 @@ -1928,7 +1940,7 @@ def test_modify_database_subnet_group(): SubnetIds=[subnet1["SubnetId"], subnet2["SubnetId"]], ) - conn.describe_db_subnet_groups()["DBSubnetGroups"] + _ = conn.describe_db_subnet_groups()["DBSubnetGroups"] # FIXME: Group is deleted atm # TODO: we should check whether all attrs are persisted @@ -1959,7 +1971,7 @@ def test_create_database_in_subnet_group(): DBSubnetGroupName="db_subnet1", ) result = conn.describe_db_instances(DBInstanceIdentifier="db-master-1") - result["DBInstances"][0]["DBSubnetGroup"]["DBSubnetGroupName"].should.equal( + assert result["DBInstances"][0]["DBSubnetGroup"]["DBSubnetGroupName"] == ( "db_subnet1" ) @@ -1986,18 +1998,24 @@ def test_describe_database_subnet_group(): ) resp = conn.describe_db_subnet_groups() - resp["DBSubnetGroups"].should.have.length_of(2) + assert len(resp["DBSubnetGroups"]) == 2 subnets = resp["DBSubnetGroups"][0]["Subnets"] - subnets.should.have.length_of(1) + assert len(subnets) == 1 - list( - conn.describe_db_subnet_groups(DBSubnetGroupName="db_subnet1")["DBSubnetGroups"] - ).should.have.length_of(1) + assert ( + len( + list( + conn.describe_db_subnet_groups(DBSubnetGroupName="db_subnet1")[ + "DBSubnetGroups" + ] + ) + ) + == 1 + ) - conn.describe_db_subnet_groups.when.called_with( - DBSubnetGroupName="not-a-subnet" - ).should.throw(ClientError) + with pytest.raises(ClientError): + conn.describe_db_subnet_groups(DBSubnetGroupName="not-a-subnet") @mock_ec2 @@ -2011,7 +2029,7 @@ def test_delete_database_subnet_group(): conn = boto3.client("rds", region_name=DEFAULT_REGION) result = conn.describe_db_subnet_groups() - result["DBSubnetGroups"].should.have.length_of(0) + assert len(result["DBSubnetGroups"]) == 0 conn.create_db_subnet_group( DBSubnetGroupName="db_subnet1", @@ -2019,15 +2037,14 @@ def test_delete_database_subnet_group(): SubnetIds=[subnet["SubnetId"]], ) result = conn.describe_db_subnet_groups() - result["DBSubnetGroups"].should.have.length_of(1) + assert len(result["DBSubnetGroups"]) == 1 conn.delete_db_subnet_group(DBSubnetGroupName="db_subnet1") result = conn.describe_db_subnet_groups() - result["DBSubnetGroups"].should.have.length_of(0) + assert len(result["DBSubnetGroups"]) == 0 - conn.delete_db_subnet_group.when.called_with( - DBSubnetGroupName="db_subnet1" - ).should.throw(ClientError) + with pytest.raises(ClientError): + conn.delete_db_subnet_group(DBSubnetGroupName="db_subnet1") @mock_ec2 @@ -2041,7 +2058,7 @@ def test_list_tags_database_subnet_group(): conn = boto3.client("rds", region_name=DEFAULT_REGION) result = conn.describe_db_subnet_groups() - result["DBSubnetGroups"].should.have.length_of(0) + assert len(result["DBSubnetGroups"]) == 0 subnet = conn.create_db_subnet_group( DBSubnetGroupName="db_subnet1", @@ -2052,9 +2069,10 @@ def test_list_tags_database_subnet_group(): result = conn.list_tags_for_resource( ResourceName=f"arn:aws:rds:us-west-2:1234567890:subgrp:{subnet}" ) - result["TagList"].should.equal( - [{"Value": "bar", "Key": "foo"}, {"Value": "bar1", "Key": "foo1"}] - ) + assert result["TagList"] == [ + {"Value": "bar", "Key": "foo"}, + {"Value": "bar1", "Key": "foo1"}, + ] @mock_rds @@ -2069,18 +2087,18 @@ def test_modify_tags_parameter_group(): ) resource = result["DBParameterGroup"]["DBParameterGroupArn"] result = conn.list_tags_for_resource(ResourceName=resource) - result["TagList"].should.equal(client_tags) + assert result["TagList"] == client_tags server_tags = [{"Key": "character_set_server", "Value": "utf-8"}] conn.add_tags_to_resource(ResourceName=resource, Tags=server_tags) combined_tags = client_tags + server_tags result = conn.list_tags_for_resource(ResourceName=resource) - result["TagList"].should.equal(combined_tags) + assert result["TagList"] == combined_tags conn.remove_tags_from_resource( ResourceName=resource, TagKeys=["character_set_client"] ) result = conn.list_tags_for_resource(ResourceName=resource) - result["TagList"].should.equal(server_tags) + assert result["TagList"] == server_tags @mock_rds @@ -2096,16 +2114,16 @@ def test_modify_tags_event_subscription(): ) resource = result["EventSubscription"]["EventSubscriptionArn"] result = conn.list_tags_for_resource(ResourceName=resource) - result["TagList"].should.equal(tags) + assert result["TagList"] == tags new_tags = [{"Key": "new_key", "Value": "new_value"}] conn.add_tags_to_resource(ResourceName=resource, Tags=new_tags) combined_tags = tags + new_tags result = conn.list_tags_for_resource(ResourceName=resource) - result["TagList"].should.equal(combined_tags) + assert result["TagList"] == combined_tags conn.remove_tags_from_resource(ResourceName=resource, TagKeys=["new_key"]) result = conn.list_tags_for_resource(ResourceName=resource) - result["TagList"].should.equal(tags) + assert result["TagList"] == tags @mock_ec2 @@ -2119,7 +2137,7 @@ def test_add_tags_database_subnet_group(): conn = boto3.client("rds", region_name=DEFAULT_REGION) result = conn.describe_db_subnet_groups() - result["DBSubnetGroups"].should.have.length_of(0) + assert len(result["DBSubnetGroups"]) == 0 subnet = conn.create_db_subnet_group( DBSubnetGroupName="db_subnet1", @@ -2135,9 +2153,10 @@ def test_add_tags_database_subnet_group(): ) result = conn.list_tags_for_resource(ResourceName=resource) - result["TagList"].should.equal( - [{"Value": "bar", "Key": "foo"}, {"Value": "bar1", "Key": "foo1"}] - ) + assert result["TagList"] == [ + {"Value": "bar", "Key": "foo"}, + {"Value": "bar1", "Key": "foo1"}, + ] @mock_ec2 @@ -2151,7 +2170,7 @@ def test_remove_tags_database_subnet_group(): conn = boto3.client("rds", region_name=DEFAULT_REGION) result = conn.describe_db_subnet_groups() - result["DBSubnetGroups"].should.have.length_of(0) + assert len(result["DBSubnetGroups"]) == 0 subnet = conn.create_db_subnet_group( DBSubnetGroupName="db_subnet1", @@ -2164,7 +2183,7 @@ def test_remove_tags_database_subnet_group(): conn.remove_tags_from_resource(ResourceName=resource, TagKeys=["foo"]) result = conn.list_tags_for_resource(ResourceName=resource) - result["TagList"].should.equal([{"Value": "bar1", "Key": "foo1"}]) + assert result["TagList"] == [{"Value": "bar1", "Key": "foo1"}] @mock_rds @@ -2187,25 +2206,25 @@ def test_create_database_replica(): SourceDBInstanceIdentifier="db-master-1", DBInstanceClass="db.m1.small", ) - replica["DBInstance"]["ReadReplicaSourceDBInstanceIdentifier"].should.equal( + assert replica["DBInstance"]["ReadReplicaSourceDBInstanceIdentifier"] == ( "db-master-1" ) - replica["DBInstance"]["DBInstanceClass"].should.equal("db.m1.small") - replica["DBInstance"]["DBInstanceIdentifier"].should.equal("db-replica-1") + assert replica["DBInstance"]["DBInstanceClass"] == "db.m1.small" + assert replica["DBInstance"]["DBInstanceIdentifier"] == "db-replica-1" master = conn.describe_db_instances(DBInstanceIdentifier="db-master-1") - master["DBInstances"][0]["ReadReplicaDBInstanceIdentifiers"].should.equal( + assert master["DBInstances"][0]["ReadReplicaDBInstanceIdentifiers"] == ( ["db-replica-1"] ) replica = conn.describe_db_instances(DBInstanceIdentifier="db-replica-1")[ "DBInstances" ][0] - replica["ReadReplicaSourceDBInstanceIdentifier"].should.equal("db-master-1") + assert replica["ReadReplicaSourceDBInstanceIdentifier"] == "db-master-1" conn.delete_db_instance(DBInstanceIdentifier="db-replica-1", SkipFinalSnapshot=True) master = conn.describe_db_instances(DBInstanceIdentifier="db-master-1") - master["DBInstances"][0]["ReadReplicaDBInstanceIdentifiers"].should.equal([]) + assert master["DBInstances"][0]["ReadReplicaDBInstanceIdentifiers"] == [] @mock_rds @@ -2231,14 +2250,12 @@ def test_create_database_replica_cross_region(): source_db = us1.describe_db_instances(DBInstanceIdentifier=source_id)[ "DBInstances" ][0] - source_db.should.have.key("ReadReplicaDBInstanceIdentifiers").equals([target_arn]) + assert source_db["ReadReplicaDBInstanceIdentifiers"] == [target_arn] target_db = us2.describe_db_instances(DBInstanceIdentifier=target_id)[ "DBInstances" ][0] - target_db.should.have.key("ReadReplicaSourceDBInstanceIdentifier").equals( - source_arn - ) + assert target_db["ReadReplicaSourceDBInstanceIdentifier"] == source_arn @mock_rds @@ -2265,8 +2282,8 @@ def test_create_database_with_encrypted_storage(): KmsKeyId=key["KeyMetadata"]["KeyId"], ) - database["DBInstance"]["StorageEncrypted"].should.equal(True) - database["DBInstance"]["KmsKeyId"].should.equal(key["KeyMetadata"]["KeyId"]) + assert database["DBInstance"]["StorageEncrypted"] is True + assert database["DBInstance"]["KmsKeyId"] == key["KeyMetadata"]["KeyId"] @mock_rds @@ -2280,14 +2297,14 @@ def test_create_db_parameter_group(): Description="test parameter group", ) - db_parameter_group["DBParameterGroup"]["DBParameterGroupName"].should.equal("test") - db_parameter_group["DBParameterGroup"]["DBParameterGroupFamily"].should.equal( + assert db_parameter_group["DBParameterGroup"]["DBParameterGroupName"] == "test" + assert db_parameter_group["DBParameterGroup"]["DBParameterGroupFamily"] == ( "mysql5.6" ) - db_parameter_group["DBParameterGroup"]["Description"].should.equal( + assert db_parameter_group["DBParameterGroup"]["Description"] == ( "test parameter group" ) - db_parameter_group["DBParameterGroup"]["DBParameterGroupArn"].should.equal( + assert db_parameter_group["DBParameterGroup"]["DBParameterGroupArn"] == ( f"arn:aws:rds:{region}:{ACCOUNT_ID}:pg:{pg_name}" ) @@ -2312,11 +2329,11 @@ def test_create_db_instance_with_parameter_group(): Port=1234, ) - len(database["DBInstance"]["DBParameterGroups"]).should.equal(1) - database["DBInstance"]["DBParameterGroups"][0]["DBParameterGroupName"].should.equal( + assert len(database["DBInstance"]["DBParameterGroups"]) == 1 + assert database["DBInstance"]["DBParameterGroups"][0]["DBParameterGroupName"] == ( "test" ) - database["DBInstance"]["DBParameterGroups"][0]["ParameterApplyStatus"].should.equal( + assert database["DBInstance"]["DBParameterGroups"][0]["ParameterApplyStatus"] == ( "in-sync" ) @@ -2333,7 +2350,7 @@ def test_create_database_with_default_port(): MasterUserPassword="hunter2", DBSecurityGroups=["my_sg"], ) - database["DBInstance"]["Endpoint"]["Port"].should.equal(5432) + assert database["DBInstance"]["Endpoint"]["Port"] == 5432 @mock_rds @@ -2349,11 +2366,11 @@ def test_modify_db_instance_with_parameter_group(): Port=1234, ) - len(database["DBInstance"]["DBParameterGroups"]).should.equal(1) - database["DBInstance"]["DBParameterGroups"][0]["DBParameterGroupName"].should.equal( + assert len(database["DBInstance"]["DBParameterGroups"]) == 1 + assert database["DBInstance"]["DBParameterGroups"][0]["DBParameterGroupName"] == ( "default.mysql5.6" ) - database["DBInstance"]["DBParameterGroups"][0]["ParameterApplyStatus"].should.equal( + assert database["DBInstance"]["DBParameterGroups"][0]["ParameterApplyStatus"] == ( "in-sync" ) @@ -2371,17 +2388,20 @@ def test_modify_db_instance_with_parameter_group(): database = conn.describe_db_instances(DBInstanceIdentifier="db-master-1")[ "DBInstances" ][0] - len(database["DBParameterGroups"]).should.equal(1) - database["DBParameterGroups"][0]["DBParameterGroupName"].should.equal("test") - database["DBParameterGroups"][0]["ParameterApplyStatus"].should.equal("in-sync") + assert len(database["DBParameterGroups"]) == 1 + assert database["DBParameterGroups"][0]["DBParameterGroupName"] == "test" + assert database["DBParameterGroups"][0]["ParameterApplyStatus"] == "in-sync" @mock_rds def test_create_db_parameter_group_empty_description(): conn = boto3.client("rds", region_name=DEFAULT_REGION) - conn.create_db_parameter_group.when.called_with( - DBParameterGroupName="test", DBParameterGroupFamily="mysql5.6", Description="" - ).should.throw(ClientError) + with pytest.raises(ClientError): + conn.create_db_parameter_group( + DBParameterGroupName="test", + DBParameterGroupFamily="mysql5.6", + Description="", + ) @mock_rds @@ -2392,11 +2412,12 @@ def test_create_db_parameter_group_duplicate(): DBParameterGroupFamily="mysql5.6", Description="test parameter group", ) - conn.create_db_parameter_group.when.called_with( - DBParameterGroupName="test", - DBParameterGroupFamily="mysql5.6", - Description="test parameter group", - ).should.throw(ClientError) + with pytest.raises(ClientError): + conn.create_db_parameter_group( + DBParameterGroupName="test", + DBParameterGroupFamily="mysql5.6", + Description="test parameter group", + ) @mock_rds @@ -2410,10 +2431,10 @@ def test_describe_db_parameter_group(): Description="test parameter group", ) db_parameter_groups = conn.describe_db_parameter_groups(DBParameterGroupName="test") - db_parameter_groups["DBParameterGroups"][0]["DBParameterGroupName"].should.equal( + assert db_parameter_groups["DBParameterGroups"][0]["DBParameterGroupName"] == ( "test" ) - db_parameter_groups["DBParameterGroups"][0]["DBParameterGroupArn"].should.equal( + assert db_parameter_groups["DBParameterGroups"][0]["DBParameterGroupArn"] == ( f"arn:aws:rds:{region}:{ACCOUNT_ID}:pg:{pg_name}" ) @@ -2422,7 +2443,7 @@ def test_describe_db_parameter_group(): def test_describe_non_existent_db_parameter_group(): conn = boto3.client("rds", region_name=DEFAULT_REGION) db_parameter_groups = conn.describe_db_parameter_groups(DBParameterGroupName="test") - len(db_parameter_groups["DBParameterGroups"]).should.equal(0) + assert len(db_parameter_groups["DBParameterGroups"]) == 0 @mock_rds @@ -2434,12 +2455,12 @@ def test_delete_db_parameter_group(): Description="test parameter group", ) db_parameter_groups = conn.describe_db_parameter_groups(DBParameterGroupName="test") - db_parameter_groups["DBParameterGroups"][0]["DBParameterGroupName"].should.equal( + assert db_parameter_groups["DBParameterGroups"][0]["DBParameterGroupName"] == ( "test" ) conn.delete_db_parameter_group(DBParameterGroupName="test") db_parameter_groups = conn.describe_db_parameter_groups(DBParameterGroupName="test") - len(db_parameter_groups["DBParameterGroups"]).should.equal(0) + assert len(db_parameter_groups["DBParameterGroups"]) == 0 @mock_rds @@ -2463,21 +2484,20 @@ def test_modify_db_parameter_group(): ], ) - modify_result["DBParameterGroupName"].should.equal("test") + assert modify_result["DBParameterGroupName"] == "test" db_parameters = conn.describe_db_parameters(DBParameterGroupName="test") - db_parameters["Parameters"][0]["ParameterName"].should.equal("foo") - db_parameters["Parameters"][0]["ParameterValue"].should.equal("foo_val") - db_parameters["Parameters"][0]["Description"].should.equal("test param") - db_parameters["Parameters"][0]["ApplyMethod"].should.equal("immediate") + assert db_parameters["Parameters"][0]["ParameterName"] == "foo" + assert db_parameters["Parameters"][0]["ParameterValue"] == "foo_val" + assert db_parameters["Parameters"][0]["Description"] == "test param" + assert db_parameters["Parameters"][0]["ApplyMethod"] == "immediate" @mock_rds def test_delete_non_existent_db_parameter_group(): conn = boto3.client("rds", region_name=DEFAULT_REGION) - conn.delete_db_parameter_group.when.called_with( - DBParameterGroupName="non-existent" - ).should.throw(ClientError) + with pytest.raises(ClientError): + conn.delete_db_parameter_group(DBParameterGroupName="non-existent") @mock_rds @@ -2492,7 +2512,7 @@ def test_create_parameter_group_with_tags(): result = conn.list_tags_for_resource( ResourceName="arn:aws:rds:us-west-2:1234567890:pg:test" ) - result["TagList"].should.equal([{"Value": "bar", "Key": "foo"}]) + assert result["TagList"] == [{"Value": "bar", "Key": "foo"}] @mock_rds @@ -2507,13 +2527,13 @@ def test_create_db_with_iam_authentication(): ) db_instance = database["DBInstance"] - db_instance["IAMDatabaseAuthenticationEnabled"].should.equal(True) + assert db_instance["IAMDatabaseAuthenticationEnabled"] is True snapshot = conn.create_db_snapshot( DBInstanceIdentifier="rds", DBSnapshotIdentifier="snapshot" ).get("DBSnapshot") - snapshot.get("IAMDatabaseAuthenticationEnabled").should.equal(True) + assert snapshot.get("IAMDatabaseAuthenticationEnabled") is True @mock_rds @@ -2528,10 +2548,10 @@ def test_create_db_instance_with_tags(): DBInstanceClass="db.m1.small", Tags=tags, ) - resp["DBInstance"]["TagList"].should.equal(tags) + assert resp["DBInstance"]["TagList"] == tags resp = client.describe_db_instances(DBInstanceIdentifier=db_instance_identifier) - resp["DBInstances"][0]["TagList"].should.equal(tags) + assert resp["DBInstances"][0]["TagList"] == tags @mock_rds @@ -2545,10 +2565,10 @@ def test_create_db_instance_without_availability_zone(): DBName="staging-postgres", DBInstanceClass="db.m1.small", ) - resp["DBInstance"]["AvailabilityZone"].should.contain(region) + assert region in resp["DBInstance"]["AvailabilityZone"] resp = client.describe_db_instances(DBInstanceIdentifier=db_instance_identifier) - resp["DBInstances"][0]["AvailabilityZone"].should.contain(region) + assert region in resp["DBInstances"][0]["AvailabilityZone"] @mock_rds @@ -2564,10 +2584,10 @@ def test_create_db_instance_with_availability_zone(): DBInstanceClass="db.m1.small", AvailabilityZone=availability_zone, ) - resp["DBInstance"]["AvailabilityZone"].should.equal(availability_zone) + assert resp["DBInstance"]["AvailabilityZone"] == availability_zone resp = client.describe_db_instances(DBInstanceIdentifier=db_instance_identifier) - resp["DBInstances"][0]["AvailabilityZone"].should.equal(availability_zone) + assert resp["DBInstances"][0]["AvailabilityZone"] == availability_zone @mock_rds @@ -2614,6 +2634,7 @@ def validation_helper(exc): assert err["Code"] == "InvalidParameterValue" assert err["Message"] == ( "The parameter DBInstanceIdentifier is not a valid identifier. " - "Identifiers must begin with a letter; must contain only ASCII letters, digits, and hyphens; " + "Identifiers must begin with a letter; must contain only ASCII " + "letters, digits, and hyphens; " "and must not end with a hyphen or contain two consecutive hyphens." ) diff --git a/tests/test_rds/test_rds_cloudformation.py b/tests/test_rds/test_rds_cloudformation.py index 6a2ba1441..f256203df 100644 --- a/tests/test_rds/test_rds_cloudformation.py +++ b/tests/test_rds/test_rds_cloudformation.py @@ -1,6 +1,7 @@ -import boto3 import json -import sure # noqa # pylint: disable=unused-import + +import boto3 + from moto import mock_cloudformation, mock_ec2, mock_rds from tests.test_cloudformation.fixtures import rds_mysql_with_db_parameter_group from tests.test_cloudformation.fixtures import rds_mysql_with_read_replica @@ -36,12 +37,12 @@ def test_create_subnetgroup_via_cf(): cf.create_stack(StackName="test_stack", TemplateBody=template_json) response = rds.describe_db_subnet_groups()["DBSubnetGroups"] - response.should.have.length_of(1) + assert len(response) == 1 created_subnet = response[0] - created_subnet.should.have.key("DBSubnetGroupName").equal("subnetgroupname") - created_subnet.should.have.key("DBSubnetGroupDescription").equal("subnetgroupdesc") - created_subnet.should.have.key("VpcId").equal(vpc["VpcId"]) + assert created_subnet["DBSubnetGroupName"] == "subnetgroupname" + assert created_subnet["DBSubnetGroupDescription"] == "subnetgroupdesc" + assert created_subnet["VpcId"] == vpc["VpcId"] @mock_ec2 @@ -82,19 +83,19 @@ def test_create_dbinstance_via_cf(): db_instance_identifier = summaries[0]["PhysicalResourceId"] resp = rds.describe_db_instances()["DBInstances"] - resp.should.have.length_of(1) + assert len(resp) == 1 created = resp[0] - created["DBInstanceIdentifier"].should.equal(db_instance_identifier) - created["Engine"].should.equal("mysql") - created["DBInstanceStatus"].should.equal("available") + assert created["DBInstanceIdentifier"] == db_instance_identifier + assert created["Engine"] == "mysql" + assert created["DBInstanceStatus"] == "available" # Verify the stack outputs are correct o = _get_stack_outputs(cf, stack_name="test_stack") - o.should.have.key("db_address").equals( + assert o["db_address"] == ( f"{db_instance_identifier}.aaaaaaaaaa.us-west-2.rds.amazonaws.com" ) - o.should.have.key("db_port").equals("3307") + assert o["db_port"] == "3307" @mock_ec2 @@ -121,10 +122,10 @@ def test_create_dbsecuritygroup_via_cf(): cf.create_stack(StackName="test_stack", TemplateBody=template_json) result = rds.describe_db_security_groups()["DBSecurityGroups"] - result.should.have.length_of(1) + assert len(result) == 1 created = result[0] - created["DBSecurityGroupDescription"].should.equal("my sec group") + assert created["DBSecurityGroupDescription"] == "my sec group" @mock_cloudformation @@ -159,7 +160,7 @@ def test_rds_db_parameter_groups(): rds_conn = boto3.client("rds", region_name="us-west-1") db_parameter_groups = rds_conn.describe_db_parameter_groups() - db_parameter_groups["DBParameterGroups"].should.have.length_of(1) + assert len(db_parameter_groups["DBParameterGroups"]) == 1 db_parameter_group_name = db_parameter_groups["DBParameterGroups"][0][ "DBParameterGroupName" ] @@ -174,7 +175,7 @@ def test_rds_db_parameter_groups(): ): found_cloudformation_set_parameter = True - found_cloudformation_set_parameter.should.equal(True) + assert found_cloudformation_set_parameter is True @mock_cloudformation @@ -260,7 +261,7 @@ def test_rds_mysql_with_read_replica_in_vpc(): subnet_group = rds.describe_db_subnet_groups(DBSubnetGroupName=subnet_group_name)[ "DBSubnetGroups" ][0] - subnet_group.should.have.key("DBSubnetGroupDescription").equal("my db subnet group") + assert subnet_group["DBSubnetGroupDescription"] == "my db subnet group" @mock_ec2 @@ -292,12 +293,12 @@ def test_delete_dbinstance_via_cf(): cf.create_stack(StackName="test_stack", TemplateBody=template_json) resp = rds.describe_db_instances()["DBInstances"] - resp.should.have.length_of(1) + assert len(resp) == 1 cf.delete_stack(StackName="test_stack") resp = rds.describe_db_instances()["DBInstances"] - resp.should.have.length_of(0) + assert len(resp) == 0 def _get_stack_outputs(cf_client, stack_name): diff --git a/tests/test_rds/test_rds_clusters.py b/tests/test_rds/test_rds_clusters.py index 2b83979ef..678714d34 100644 --- a/tests/test_rds/test_rds_clusters.py +++ b/tests/test_rds/test_rds_clusters.py @@ -1,8 +1,9 @@ -import boto3 -import pytest -import sure # noqa # pylint: disable=unused-import +import re +import boto3 from botocore.exceptions import ClientError +import pytest + from moto import mock_rds from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID @@ -12,7 +13,7 @@ def test_describe_db_cluster_initial(): client = boto3.client("rds", region_name="eu-north-1") resp = client.describe_db_clusters() - resp.should.have.key("DBClusters").should.have.length_of(0) + assert len(resp["DBClusters"]) == 0 @mock_rds @@ -20,12 +21,12 @@ def test_describe_db_cluster_fails_for_non_existent_cluster(): client = boto3.client("rds", region_name="eu-north-1") resp = client.describe_db_clusters() - resp.should.have.key("DBClusters").should.have.length_of(0) + assert len(resp["DBClusters"]) == 0 with pytest.raises(ClientError) as ex: client.describe_db_clusters(DBClusterIdentifier="cluster-id") err = ex.value.response["Error"] - err["Code"].should.equal("DBClusterNotFoundFault") - err["Message"].should.equal("DBCluster cluster-id not found.") + assert err["Code"] == "DBClusterNotFoundFault" + assert err["Message"] == "DBCluster cluster-id not found." @mock_rds @@ -35,8 +36,8 @@ def test_create_db_cluster_needs_master_username(): with pytest.raises(ClientError) as ex: client.create_db_cluster(DBClusterIdentifier="cluster-id", Engine="aurora") err = ex.value.response["Error"] - err["Code"].should.equal("InvalidParameterValue") - err["Message"].should.equal( + assert err["Code"] == "InvalidParameterValue" + assert err["Message"] == ( "The parameter MasterUsername must be provided and must not be blank." ) @@ -50,8 +51,8 @@ def test_create_db_cluster_needs_master_user_password(): DBClusterIdentifier="cluster-id", Engine="aurora", MasterUsername="root" ) err = ex.value.response["Error"] - err["Code"].should.equal("InvalidParameterValue") - err["Message"].should.equal( + assert err["Code"] == "InvalidParameterValue" + assert err["Message"] == ( "The parameter MasterUserPassword must be provided and must not be blank." ) @@ -68,9 +69,10 @@ def test_create_db_cluster_needs_long_master_user_password(): MasterUserPassword="hunter2", ) err = ex.value.response["Error"] - err["Code"].should.equal("InvalidParameterValue") - err["Message"].should.equal( - "The parameter MasterUserPassword is not a valid password because it is shorter than 8 characters." + assert err["Code"] == "InvalidParameterValue" + assert err["Message"] == ( + "The parameter MasterUserPassword is not a valid password because " + "it is shorter than 8 characters." ) @@ -91,9 +93,10 @@ def test_modify_db_cluster_needs_long_master_user_password(): MasterUserPassword="hunter2", ) err = ex.value.response["Error"] - err["Code"].should.equal("InvalidParameterValue") - err["Message"].should.equal( - "The parameter MasterUserPassword is not a valid password because it is shorter than 8 characters." + assert err["Code"] == "InvalidParameterValue" + assert err["Message"] == ( + "The parameter MasterUserPassword is not a valid password because " + "it is shorter than 8 characters." ) @@ -116,7 +119,7 @@ def test_modify_db_cluster_new_cluster_identifier(): MasterUserPassword="hunter21", ) - resp["DBCluster"].should.have.key("DBClusterIdentifier").equal(new_id) + assert resp["DBCluster"]["DBClusterIdentifier"] == new_id clusters = [ cluster["DBClusterIdentifier"] @@ -136,61 +139,61 @@ def test_create_db_cluster__verify_default_properties(): MasterUsername="root", MasterUserPassword="hunter2_", ) - resp.should.have.key("DBCluster") + assert "DBCluster" in resp cluster = resp["DBCluster"] - cluster.shouldnt.have.key( - "DatabaseName" - ) # This was not supplied, so should not be returned + # This was not supplied, so should not be returned + assert "DatabaseName" not in cluster - cluster.should.have.key("AvailabilityZones") - set(cluster["AvailabilityZones"]).should.equal( - {"eu-north-1a", "eu-north-1b", "eu-north-1c"} - ) - cluster.should.have.key("BackupRetentionPeriod").equal(1) - cluster.should.have.key("DBClusterIdentifier").equal("cluster-id") - cluster.should.have.key("DBClusterParameterGroup").equal("default.aurora8.0") - cluster.should.have.key("DBSubnetGroup").equal("default") - cluster.should.have.key("Status").equal("creating") - cluster.should.have.key("Endpoint").match( - "cluster-id.cluster-[a-z0-9]{12}.eu-north-1.rds.amazonaws.com" + assert "AvailabilityZones" in cluster + assert set(cluster["AvailabilityZones"]) == { + "eu-north-1a", + "eu-north-1b", + "eu-north-1c", + } + assert cluster["BackupRetentionPeriod"] == 1 + assert cluster["DBClusterIdentifier"] == "cluster-id" + assert cluster["DBClusterParameterGroup"] == "default.aurora8.0" + assert cluster["DBSubnetGroup"] == "default" + assert cluster["Status"] == "creating" + assert re.match( + "cluster-id.cluster-[a-z0-9]{12}.eu-north-1.rds.amazonaws.com", + cluster["Endpoint"], ) endpoint = cluster["Endpoint"] expected_readonly = endpoint.replace( "cluster-id.cluster-", "cluster-id.cluster-ro-" ) - cluster.should.have.key("ReaderEndpoint").equal(expected_readonly) - cluster.should.have.key("MultiAZ").equal(False) - cluster.should.have.key("Engine").equal("aurora") - cluster.should.have.key("EngineVersion").equal("5.6.mysql_aurora.1.22.5") - cluster.should.have.key("Port").equal(3306) - cluster.should.have.key("MasterUsername").equal("root") - cluster.should.have.key("PreferredBackupWindow").equal("01:37-02:07") - cluster.should.have.key("PreferredMaintenanceWindow").equal("wed:02:40-wed:03:10") - cluster.should.have.key("ReadReplicaIdentifiers").equal([]) - cluster.should.have.key("DBClusterMembers").equal([]) - cluster.should.have.key("VpcSecurityGroups") - cluster.should.have.key("HostedZoneId") - cluster.should.have.key("StorageEncrypted").equal(False) - cluster.should.have.key("DbClusterResourceId").match(r"cluster-[A-Z0-9]{26}") - cluster.should.have.key("DBClusterArn").equal( + assert cluster["ReaderEndpoint"] == expected_readonly + assert cluster["MultiAZ"] is False + assert cluster["Engine"] == "aurora" + assert cluster["EngineVersion"] == "5.6.mysql_aurora.1.22.5" + assert cluster["Port"] == 3306 + assert cluster["MasterUsername"] == "root" + assert cluster["PreferredBackupWindow"] == "01:37-02:07" + assert cluster["PreferredMaintenanceWindow"] == "wed:02:40-wed:03:10" + assert cluster["ReadReplicaIdentifiers"] == [] + assert cluster["DBClusterMembers"] == [] + assert "VpcSecurityGroups" in cluster + assert "HostedZoneId" in cluster + assert cluster["StorageEncrypted"] is False + assert re.match(r"cluster-[A-Z0-9]{26}", cluster["DbClusterResourceId"]) + assert cluster["DBClusterArn"] == ( f"arn:aws:rds:eu-north-1:{ACCOUNT_ID}:cluster:cluster-id" ) - cluster.should.have.key("AssociatedRoles").equal([]) - cluster.should.have.key("IAMDatabaseAuthenticationEnabled").equal(False) - cluster.should.have.key("EngineMode").equal("provisioned") - cluster.should.have.key("DeletionProtection").equal(False) - cluster.should.have.key("HttpEndpointEnabled").equal(False) - cluster.should.have.key("CopyTagsToSnapshot").equal(False) - cluster.should.have.key("CrossAccountClone").equal(False) - cluster.should.have.key("DeletionProtection").equal(False) - cluster.should.have.key("DomainMemberships").equal([]) - cluster.should.have.key("TagList").equal([]) - cluster.should.have.key("ClusterCreateTime") - cluster.should.have.key( - "EarliestRestorableTime" - ).should.be.greater_than_or_equal_to(cluster["ClusterCreateTime"]) + assert cluster["AssociatedRoles"] == [] + assert cluster["IAMDatabaseAuthenticationEnabled"] is False + assert cluster["EngineMode"] == "provisioned" + assert cluster["DeletionProtection"] is False + assert cluster["HttpEndpointEnabled"] is False + assert cluster["CopyTagsToSnapshot"] is False + assert cluster["CrossAccountClone"] is False + assert cluster["DeletionProtection"] is False + assert cluster["DomainMemberships"] == [] + assert cluster["TagList"] == [] + assert "ClusterCreateTime" in cluster + assert cluster["EarliestRestorableTime"] >= cluster["ClusterCreateTime"] @mock_rds @@ -221,14 +224,14 @@ def test_create_db_cluster_additional_parameters(): cluster = resp["DBCluster"] - cluster.should.have.key("AvailabilityZones").equal(["eu-north-1b"]) - cluster.should.have.key("DatabaseName").equal("users") - cluster.should.have.key("Engine").equal("aurora") - cluster.should.have.key("EngineVersion").equal("8.0.mysql_aurora.3.01.0") - cluster.should.have.key("EngineMode").equal("serverless") - cluster.should.have.key("Port").equal(1234) - cluster.should.have.key("DeletionProtection").equal(True) - cluster.should.have.key("EnabledCloudwatchLogsExports").equals(["audit"]) + assert cluster["AvailabilityZones"] == ["eu-north-1b"] + assert cluster["DatabaseName"] == "users" + assert cluster["Engine"] == "aurora" + assert cluster["EngineVersion"] == "8.0.mysql_aurora.3.01.0" + assert cluster["EngineMode"] == "serverless" + assert cluster["Port"] == 1234 + assert cluster["DeletionProtection"] is True + assert cluster["EnabledCloudwatchLogsExports"] == ["audit"] assert cluster["KmsKeyId"] == "some:kms:arn" assert cluster["NetworkType"] == "IPV4" assert cluster["DBSubnetGroup"] == "subnetgroupname" @@ -258,15 +261,19 @@ def test_describe_db_cluster_after_creation(): MasterUserPassword="hunter2_", )["DBCluster"]["DBClusterArn"] - client.describe_db_clusters()["DBClusters"].should.have.length_of(2) + assert len(client.describe_db_clusters()["DBClusters"]) == 2 - client.describe_db_clusters(DBClusterIdentifier="cluster-id2")[ - "DBClusters" - ].should.have.length_of(1) + assert ( + len( + client.describe_db_clusters(DBClusterIdentifier="cluster-id2")["DBClusters"] + ) + == 1 + ) - client.describe_db_clusters(DBClusterIdentifier=cluster_arn)[ - "DBClusters" - ].should.have.length_of(1) + assert ( + len(client.describe_db_clusters(DBClusterIdentifier=cluster_arn)["DBClusters"]) + == 1 + ) @mock_rds @@ -282,7 +289,7 @@ def test_delete_db_cluster(): client.delete_db_cluster(DBClusterIdentifier="cluster-id") - client.describe_db_clusters()["DBClusters"].should.have.length_of(0) + assert len(client.describe_db_clusters()["DBClusters"]) == 0 @mock_rds @@ -299,7 +306,7 @@ def test_delete_db_cluster_do_snapshot(): client.delete_db_cluster( DBClusterIdentifier="cluster-id", FinalDBSnapshotIdentifier="final-snapshot" ) - client.describe_db_clusters()["DBClusters"].should.have.length_of(0) + assert len(client.describe_db_clusters()["DBClusters"]) == 0 snapshot = client.describe_db_cluster_snapshots()["DBClusterSnapshots"][0] assert snapshot["DBClusterIdentifier"] == "cluster-id" assert snapshot["DBClusterSnapshotIdentifier"] == "final-snapshot" @@ -321,7 +328,7 @@ def test_delete_db_cluster_that_is_protected(): with pytest.raises(ClientError) as exc: client.delete_db_cluster(DBClusterIdentifier="cluster-id") err = exc.value.response["Error"] - err["Message"].should.equal("Can't delete Cluster with protection enabled") + assert err["Message"] == "Can't delete Cluster with protection enabled" @mock_rds @@ -331,8 +338,8 @@ def test_delete_db_cluster_unknown_cluster(): with pytest.raises(ClientError) as ex: client.delete_db_cluster(DBClusterIdentifier="cluster-unknown") err = ex.value.response["Error"] - err["Code"].should.equal("DBClusterNotFoundFault") - err["Message"].should.equal("DBCluster cluster-unknown not found.") + assert err["Code"] == "DBClusterNotFoundFault" + assert err["Message"] == "DBCluster cluster-unknown not found." @mock_rds @@ -342,8 +349,8 @@ def test_start_db_cluster_unknown_cluster(): with pytest.raises(ClientError) as ex: client.start_db_cluster(DBClusterIdentifier="cluster-unknown") err = ex.value.response["Error"] - err["Code"].should.equal("DBClusterNotFoundFault") - err["Message"].should.equal("DBCluster cluster-unknown not found.") + assert err["Code"] == "DBClusterNotFoundFault" + assert err["Message"] == "DBCluster cluster-unknown not found." @mock_rds @@ -360,7 +367,7 @@ def test_start_db_cluster_after_stopping(): client.start_db_cluster(DBClusterIdentifier="cluster-id") cluster = client.describe_db_clusters()["DBClusters"][0] - cluster["Status"].should.equal("available") + assert cluster["Status"] == "available" @mock_rds @@ -377,8 +384,8 @@ def test_start_db_cluster_without_stopping(): with pytest.raises(ClientError) as ex: client.start_db_cluster(DBClusterIdentifier="cluster-id") err = ex.value.response["Error"] - err["Code"].should.equal("InvalidDBClusterStateFault") - err["Message"].should.equal("DbCluster cluster-id is not in stopped state.") + assert err["Code"] == "InvalidDBClusterStateFault" + assert err["Message"] == "DbCluster cluster-id is not in stopped state." @mock_rds @@ -395,11 +402,11 @@ def test_stop_db_cluster(): resp = client.stop_db_cluster(DBClusterIdentifier="cluster-id") # Quirk of the AWS implementation - the immediate response show it's still available cluster = resp["DBCluster"] - cluster["Status"].should.equal("available") + assert cluster["Status"] == "available" # For some time the status will be 'stopping' # And finally it will be 'stopped' cluster = client.describe_db_clusters()["DBClusters"][0] - cluster["Status"].should.equal("stopped") + assert cluster["Status"] == "stopped" @mock_rds @@ -418,8 +425,8 @@ def test_stop_db_cluster_already_stopped(): with pytest.raises(ClientError) as ex: client.stop_db_cluster(DBClusterIdentifier="cluster-id") err = ex.value.response["Error"] - err["Code"].should.equal("InvalidDBClusterStateFault") - err["Message"].should.equal("DbCluster cluster-id is not in available state.") + assert err["Code"] == "InvalidDBClusterStateFault" + assert err["Message"] == "DbCluster cluster-id is not in available state." @mock_rds @@ -429,8 +436,8 @@ def test_stop_db_cluster_unknown_cluster(): with pytest.raises(ClientError) as ex: client.stop_db_cluster(DBClusterIdentifier="cluster-unknown") err = ex.value.response["Error"] - err["Code"].should.equal("DBClusterNotFoundFault") - err["Message"].should.equal("DBCluster cluster-unknown not found.") + assert err["Code"] == "DBClusterNotFoundFault" + assert err["Message"] == "DBCluster cluster-unknown not found." @mock_rds @@ -441,7 +448,7 @@ def test_create_db_cluster_snapshot_fails_for_unknown_cluster(): DBClusterIdentifier="db-primary-1", DBClusterSnapshotIdentifier="snapshot-1" ) err = exc.value.response["Error"] - err["Message"].should.equal("DBCluster db-primary-1 not found.") + assert err["Message"] == "DBCluster db-primary-1 not found." @mock_rds @@ -467,7 +474,7 @@ def test_create_db_cluster_snapshot(): assert snapshot["DBClusterSnapshotIdentifier"] == "g-1" assert snapshot["SnapshotType"] == "manual" result = conn.list_tags_for_resource(ResourceName=snapshot["DBClusterSnapshotArn"]) - result["TagList"].should.equal([]) + assert result["TagList"] == [] @mock_rds @@ -491,14 +498,15 @@ def test_create_db_cluster_snapshot_copy_tags(): DBClusterIdentifier="db-primary-1", DBClusterSnapshotIdentifier="g-1" ).get("DBClusterSnapshot") - snapshot.get("Engine").should.equal("postgres") - snapshot.get("DBClusterIdentifier").should.equal("db-primary-1") - snapshot.get("DBClusterSnapshotIdentifier").should.equal("g-1") + assert snapshot.get("Engine") == "postgres" + assert snapshot.get("DBClusterIdentifier") == "db-primary-1" + assert snapshot.get("DBClusterSnapshotIdentifier") == "g-1" result = conn.list_tags_for_resource(ResourceName=snapshot["DBClusterSnapshotArn"]) - result["TagList"].should.equal( - [{"Value": "bar", "Key": "foo"}, {"Value": "bar1", "Key": "foo1"}] - ) + assert result["TagList"] == [ + {"Value": "bar", "Key": "foo"}, + {"Value": "bar1", "Key": "foo1"}, + ] @mock_rds @@ -512,7 +520,7 @@ def test_copy_db_cluster_snapshot_fails_for_unknown_snapshot(): ) err = exc.value.response["Error"] - err["Message"].should.equal("DBClusterSnapshot snapshot-1 not found.") + assert err["Message"] == "DBClusterSnapshot snapshot-1 not found." @mock_rds @@ -539,13 +547,13 @@ def test_copy_db_cluster_snapshot(): TargetDBClusterSnapshotIdentifier="snapshot-2", ).get("DBClusterSnapshot") - target_snapshot.get("Engine").should.equal("postgres") - target_snapshot.get("DBClusterIdentifier").should.equal("db-primary-1") - target_snapshot.get("DBClusterSnapshotIdentifier").should.equal("snapshot-2") + assert target_snapshot.get("Engine") == "postgres" + assert target_snapshot.get("DBClusterIdentifier") == "db-primary-1" + assert target_snapshot.get("DBClusterSnapshotIdentifier") == "snapshot-2" result = conn.list_tags_for_resource( ResourceName=target_snapshot["DBClusterSnapshotArn"] ) - result["TagList"].should.equal([]) + assert result["TagList"] == [] @mock_rds @@ -578,8 +586,9 @@ def test_copy_db_cluster_snapshot_fails_for_existed_target_snapshot(): ) err = exc.value.response["Error"] - err["Message"].should.equal( - "Cannot create the snapshot because a snapshot with the identifier snapshot-2 already exists." + assert err["Message"] == ( + "Cannot create the snapshot because a snapshot with the identifier " + "snapshot-2 already exists." ) @@ -601,7 +610,7 @@ def test_describe_db_cluster_snapshots(): DBClusterIdentifier="db-primary-1", DBClusterSnapshotIdentifier="snapshot-1" ).get("DBClusterSnapshot") - created.get("Engine").should.equal("postgres") + assert created.get("Engine") == "postgres" by_database_id = conn.describe_db_cluster_snapshots( DBClusterIdentifier="db-primary-1" @@ -609,11 +618,11 @@ def test_describe_db_cluster_snapshots(): by_snapshot_id = conn.describe_db_cluster_snapshots( DBClusterSnapshotIdentifier="snapshot-1" ).get("DBClusterSnapshots") - by_snapshot_id.should.equal(by_database_id) + assert by_snapshot_id == by_database_id snapshot = by_snapshot_id[0] - snapshot.should.equal(created) - snapshot.get("Engine").should.equal("postgres") + assert snapshot == created + assert snapshot.get("Engine") == "postgres" conn.create_db_cluster_snapshot( DBClusterIdentifier="db-primary-1", DBClusterSnapshotIdentifier="snapshot-2" @@ -621,7 +630,7 @@ def test_describe_db_cluster_snapshots(): snapshots = conn.describe_db_cluster_snapshots( DBClusterIdentifier="db-primary-1" ).get("DBClusterSnapshots") - snapshots.should.have.length_of(2) + assert len(snapshots) == 2 @mock_rds @@ -643,9 +652,8 @@ def test_delete_db_cluster_snapshot(): conn.describe_db_cluster_snapshots(DBClusterSnapshotIdentifier="snapshot-1") conn.delete_db_cluster_snapshot(DBClusterSnapshotIdentifier="snapshot-1") - conn.describe_db_cluster_snapshots.when.called_with( - DBClusterSnapshotIdentifier="snapshot-1" - ).should.throw(ClientError) + with pytest.raises(ClientError): + conn.describe_db_cluster_snapshots(DBClusterSnapshotIdentifier="snapshot-1") @mock_rds @@ -661,7 +669,7 @@ def test_restore_db_cluster_from_snapshot(): MasterUserPassword="hunter2000", Port=1234, ) - conn.describe_db_clusters()["DBClusters"].should.have.length_of(1) + assert len(conn.describe_db_clusters()["DBClusters"]) == 1 conn.create_db_cluster_snapshot( DBClusterIdentifier="db-primary-1", DBClusterSnapshotIdentifier="snapshot-1" @@ -673,17 +681,18 @@ def test_restore_db_cluster_from_snapshot(): SnapshotIdentifier="snapshot-1", Engine="postgres", )["DBCluster"] - new_cluster["DBClusterIdentifier"].should.equal("db-restore-1") - new_cluster["DBClusterInstanceClass"].should.equal("db.m1.small") - new_cluster["Engine"].should.equal("postgres") - new_cluster["DatabaseName"].should.equal("staging-postgres") - new_cluster["Port"].should.equal(1234) + assert new_cluster["DBClusterIdentifier"] == "db-restore-1" + assert new_cluster["DBClusterInstanceClass"] == "db.m1.small" + assert new_cluster["Engine"] == "postgres" + assert new_cluster["DatabaseName"] == "staging-postgres" + assert new_cluster["Port"] == 1234 # Verify it exists - conn.describe_db_clusters()["DBClusters"].should.have.length_of(2) - conn.describe_db_clusters(DBClusterIdentifier="db-restore-1")[ - "DBClusters" - ].should.have.length_of(1) + assert len(conn.describe_db_clusters()["DBClusters"]) == 2 + assert ( + len(conn.describe_db_clusters(DBClusterIdentifier="db-restore-1")["DBClusters"]) + == 1 + ) @mock_rds @@ -699,7 +708,7 @@ def test_restore_db_cluster_from_snapshot_and_override_params(): MasterUserPassword="hunter2000", Port=1234, ) - conn.describe_db_clusters()["DBClusters"].should.have.length_of(1) + assert len(conn.describe_db_clusters()["DBClusters"]) == 1 conn.create_db_cluster_snapshot( DBClusterIdentifier="db-primary-1", DBClusterSnapshotIdentifier="snapshot-1" ) @@ -712,10 +721,10 @@ def test_restore_db_cluster_from_snapshot_and_override_params(): Port=10000, DBClusterInstanceClass="db.r6g.xlarge", )["DBCluster"] - new_cluster["DBClusterIdentifier"].should.equal("db-restore-1") - new_cluster["DBClusterParameterGroup"].should.equal("default.aurora8.0") - new_cluster["DBClusterInstanceClass"].should.equal("db.r6g.xlarge") - new_cluster["Port"].should.equal(10000) + assert new_cluster["DBClusterIdentifier"] == "db-restore-1" + assert new_cluster["DBClusterParameterGroup"] == "default.aurora8.0" + assert new_cluster["DBClusterInstanceClass"] == "db.r6g.xlarge" + assert new_cluster["Port"] == 10000 @mock_rds @@ -739,12 +748,12 @@ def test_add_tags_to_cluster(): ) tags = conn.list_tags_for_resource(ResourceName=cluster_arn)["TagList"] - tags.should.equal([{"Key": "k1", "Value": "v1"}, {"Key": "k2", "Value": "v2"}]) + assert tags == [{"Key": "k1", "Value": "v1"}, {"Key": "k2", "Value": "v2"}] conn.remove_tags_from_resource(ResourceName=cluster_arn, TagKeys=["k1"]) tags = conn.list_tags_for_resource(ResourceName=cluster_arn)["TagList"] - tags.should.equal([{"Key": "k2", "Value": "v2"}]) + assert tags == [{"Key": "k2", "Value": "v2"}] @mock_rds @@ -771,12 +780,12 @@ def test_add_tags_to_cluster_snapshot(): ) tags = conn.list_tags_for_resource(ResourceName=snapshot_arn)["TagList"] - tags.should.equal([{"Key": "k1", "Value": "v1"}, {"Key": "k2", "Value": "v2"}]) + assert tags == [{"Key": "k1", "Value": "v1"}, {"Key": "k2", "Value": "v2"}] conn.remove_tags_from_resource(ResourceName=snapshot_arn, TagKeys=["k1"]) tags = conn.list_tags_for_resource(ResourceName=snapshot_arn)["TagList"] - tags.should.equal([{"Key": "k2", "Value": "v2"}]) + assert tags == [{"Key": "k2", "Value": "v2"}] @mock_rds @@ -795,7 +804,7 @@ def test_create_serverless_db_cluster(): ) cluster = resp["DBCluster"] # This is only true for specific engine versions - cluster.should.have.key("HttpEndpointEnabled").equal(True) + assert cluster["HttpEndpointEnabled"] is True # Verify that a default serverless_configuration is added assert "ScalingConfigurationInfo" in cluster @@ -819,7 +828,7 @@ def test_create_db_cluster_with_enable_http_endpoint_invalid(): ) cluster = resp["DBCluster"] # This attribute is ignored if an invalid engine version is supplied - cluster.should.have.key("HttpEndpointEnabled").equal(False) + assert cluster["HttpEndpointEnabled"] is False @mock_rds @@ -859,8 +868,10 @@ def test_describe_db_clusters_filter_by_engine(): @mock_rds def test_replicate_cluster(): # WHEN create_db_cluster is called - # AND create_db_cluster is called again with ReplicationSourceIdentifier set to the first cluster - # THEN promote_read_replica_db_cluster can be called on the second cluster, elevating it to a read/write cluster + # AND create_db_cluster is called again with ReplicationSourceIdentifier + # set to the first cluster + # THEN promote_read_replica_db_cluster can be called on the second + # cluster, elevating it to a read/write cluster us_east = boto3.client("rds", "us-east-1") us_west = boto3.client("rds", "us-west-1") diff --git a/tests/test_rds/test_rds_clusters_with_instances.py b/tests/test_rds/test_rds_clusters_with_instances.py index 90605615c..707999327 100644 --- a/tests/test_rds/test_rds_clusters_with_instances.py +++ b/tests/test_rds/test_rds_clusters_with_instances.py @@ -1,7 +1,7 @@ import boto3 +from botocore.exceptions import ClientError import pytest -from botocore.exceptions import ClientError from moto import mock_rds @@ -11,7 +11,7 @@ def test_add_instance_as_cluster_member(): # the instance is included as a ClusterMember in the describe_db_clusters call client = boto3.client("rds", "us-east-1") - client.create_db_cluster( + _ = client.create_db_cluster( DBClusterIdentifier="dbci", Engine="mysql", MasterUsername="masterusername", @@ -43,7 +43,7 @@ def test_remove_instance_from_cluster(): # the instance is included as a ClusterMember in the describe_db_clusters call client = boto3.client("rds", "us-east-1") - client.create_db_cluster( + _ = client.create_db_cluster( DBClusterIdentifier="dbci", Engine="mysql", MasterUsername="masterusername", @@ -72,7 +72,7 @@ def test_remove_instance_from_cluster(): def test_add_instance_to_serverless_cluster(): client = boto3.client("rds", "us-east-1") - client.create_db_cluster( + _ = client.create_db_cluster( DBClusterIdentifier="dbci", Engine="aurora", EngineMode="serverless", diff --git a/tests/test_rds/test_rds_event_subscriptions.py b/tests/test_rds/test_rds_event_subscriptions.py index 88bbb804e..7ec29d0ec 100644 --- a/tests/test_rds/test_rds_event_subscriptions.py +++ b/tests/test_rds/test_rds_event_subscriptions.py @@ -1,8 +1,7 @@ import boto3 -import pytest -import sure # noqa # pylint: disable=unused-import - from botocore.exceptions import ClientError +import pytest + from moto import mock_rds from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID @@ -44,16 +43,16 @@ def test_create_event_subscription(): SourceIds=[db_identifier], ).get("EventSubscription") - es["CustSubscriptionId"].should.equal(f"{db_identifier}-events") - es["SnsTopicArn"].should.equal( + assert es["CustSubscriptionId"] == f"{db_identifier}-events" + assert es["SnsTopicArn"] == ( f"arn:aws:sns::{ACCOUNT_ID}:{db_identifier}-events-topic" ) - es["SourceType"].should.equal("db-instance") - es["EventCategoriesList"].should.equal( + assert es["SourceType"] == "db-instance" + assert es["EventCategoriesList"] == ( ["Backup", "Creation", "Deletion", "Failure", "Recovery", "Restoration"] ) - es["SourceIdsList"].should.equal([db_identifier]) - es["Enabled"].should.equal(False) + assert es["SourceIdsList"] == [db_identifier] + assert es["Enabled"] is False @mock_rds @@ -75,8 +74,8 @@ def test_create_event_fail_already_exists(): err = ex.value.response["Error"] - err["Code"].should.equal("SubscriptionAlreadyExistFault") - err["Message"].should.equal("Subscription db-primary-1-events already exists.") + assert err["Code"] == "SubscriptionAlreadyExistFault" + assert err["Message"] == "Subscription db-primary-1-events already exists." @mock_rds @@ -86,8 +85,8 @@ def test_delete_event_subscription_fails_unknown_subscription(): client.delete_event_subscription(SubscriptionName="my-db-events") err = ex.value.response["Error"] - err["Code"].should.equal("SubscriptionNotFoundFault") - err["Message"].should.equal("Subscription my-db-events not found.") + assert err["Code"] == "SubscriptionNotFoundFault" + assert err["Message"] == "Subscription my-db-events not found." @mock_rds @@ -104,8 +103,8 @@ def test_delete_event_subscription(): SubscriptionName=f"{db_identifier}-events" ).get("EventSubscription") - es["CustSubscriptionId"].should.equal(f"{db_identifier}-events") - es["SnsTopicArn"].should.equal( + assert es["CustSubscriptionId"] == f"{db_identifier}-events" + assert es["SnsTopicArn"] == ( f"arn:aws:sns::{ACCOUNT_ID}:{db_identifier}-events-topic" ) @@ -122,8 +121,8 @@ def test_describe_event_subscriptions(): subscriptions = client.describe_event_subscriptions().get("EventSubscriptionsList") - subscriptions.should.have.length_of(1) - subscriptions[0]["CustSubscriptionId"].should.equal(f"{db_identifier}-events") + assert len(subscriptions) == 1 + assert subscriptions[0]["CustSubscriptionId"] == f"{db_identifier}-events" @mock_rds @@ -134,5 +133,5 @@ def test_describe_event_subscriptions_fails_unknown_subscription(): err = ex.value.response["Error"] - err["Code"].should.equal("SubscriptionNotFoundFault") - err["Message"].should.equal("Subscription my-db-events not found.") + assert err["Code"] == "SubscriptionNotFoundFault" + assert err["Message"] == "Subscription my-db-events not found." diff --git a/tests/test_rds/test_rds_export_tasks.py b/tests/test_rds/test_rds_export_tasks.py index 3ef0a3b0c..6193cc9af 100644 --- a/tests/test_rds/test_rds_export_tasks.py +++ b/tests/test_rds/test_rds_export_tasks.py @@ -1,8 +1,7 @@ import boto3 -import pytest -import sure # noqa # pylint: disable=unused-import - from botocore.exceptions import ClientError +import pytest + from moto import mock_rds from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID @@ -63,8 +62,8 @@ def test_start_export_task_fails_unknown_snapshot(): ) err = ex.value.response["Error"] - err["Code"].should.equal("DBSnapshotNotFound") - err["Message"].should.equal("DBSnapshot snapshot-1 not found.") + assert err["Code"] == "DBSnapshotNotFound" + assert err["Message"] == "DBSnapshot snapshot-1 not found." @mock_rds @@ -82,16 +81,16 @@ def test_start_export_task_db(): ExportOnly=["schema.table"], ) - export["ExportTaskIdentifier"].should.equal("export-snapshot-1") - export["SourceArn"].should.equal(source_arn) - export["S3Bucket"].should.equal("export-bucket") - export["S3Prefix"].should.equal("snaps/") - export["IamRoleArn"].should.equal("arn:aws:iam:::role/export-role") - export["KmsKeyId"].should.equal( + assert export["ExportTaskIdentifier"] == "export-snapshot-1" + assert export["SourceArn"] == source_arn + assert export["S3Bucket"] == "export-bucket" + assert export["S3Prefix"] == "snaps/" + assert export["IamRoleArn"] == "arn:aws:iam:::role/export-role" + assert export["KmsKeyId"] == ( "arn:aws:kms:::key/0ea3fef3-80a7-4778-9d8c-1c0c6EXAMPLE" ) - export["ExportOnly"].should.equal(["schema.table"]) - export["SourceType"].should.equal("SNAPSHOT") + assert export["ExportOnly"] == ["schema.table"] + assert export["SourceType"] == "SNAPSHOT" @mock_rds @@ -109,16 +108,16 @@ def test_start_export_task_db_cluster(): ExportOnly=["schema.table"], ) - export["ExportTaskIdentifier"].should.equal("export-snapshot-1") - export["SourceArn"].should.equal(source_arn) - export["S3Bucket"].should.equal("export-bucket") - export["S3Prefix"].should.equal("snaps/") - export["IamRoleArn"].should.equal("arn:aws:iam:::role/export-role") - export["KmsKeyId"].should.equal( + assert export["ExportTaskIdentifier"] == "export-snapshot-1" + assert export["SourceArn"] == source_arn + assert export["S3Bucket"] == "export-bucket" + assert export["S3Prefix"] == "snaps/" + assert export["IamRoleArn"] == "arn:aws:iam:::role/export-role" + assert export["KmsKeyId"] == ( "arn:aws:kms:::key/0ea3fef3-80a7-4778-9d8c-1c0c6EXAMPLE" ) - export["ExportOnly"].should.equal(["schema.table"]) - export["SourceType"].should.equal("CLUSTER") + assert export["ExportOnly"] == ["schema.table"] + assert export["SourceType"] == "CLUSTER" @mock_rds @@ -143,9 +142,10 @@ def test_start_export_task_fail_already_exists(): ) err = ex.value.response["Error"] - err["Code"].should.equal("ExportTaskAlreadyExistsFault") - err["Message"].should.equal( - "Cannot start export task because a task with the identifier export-snapshot-1 already exists." + assert err["Code"] == "ExportTaskAlreadyExistsFault" + assert err["Message"] == ( + "Cannot start export task because a task with the identifier " + "export-snapshot-1 already exists." ) @@ -156,9 +156,10 @@ def test_cancel_export_task_fails_unknown_task(): client.cancel_export_task(ExportTaskIdentifier="export-snapshot-1") err = ex.value.response["Error"] - err["Code"].should.equal("ExportTaskNotFoundFault") - err["Message"].should.equal( - "Cannot cancel export task because a task with the identifier export-snapshot-1 is not exist." + assert err["Code"] == "ExportTaskNotFoundFault" + assert err["Message"] == ( + "Cannot cancel export task because a task with the identifier " + "export-snapshot-1 is not exist." ) @@ -177,8 +178,8 @@ def test_cancel_export_task(): export = client.cancel_export_task(ExportTaskIdentifier="export-snapshot-1") - export["ExportTaskIdentifier"].should.equal("export-snapshot-1") - export["Status"].should.equal("canceled") + assert export["ExportTaskIdentifier"] == "export-snapshot-1" + assert export["Status"] == "canceled" @mock_rds @@ -195,8 +196,8 @@ def test_describe_export_tasks(): exports = client.describe_export_tasks().get("ExportTasks") - exports.should.have.length_of(1) - exports[0]["ExportTaskIdentifier"].should.equal("export-snapshot-1") + assert len(exports) == 1 + assert exports[0]["ExportTaskIdentifier"] == "export-snapshot-1" @mock_rds @@ -206,7 +207,8 @@ def test_describe_export_tasks_fails_unknown_task(): client.describe_export_tasks(ExportTaskIdentifier="export-snapshot-1") err = ex.value.response["Error"] - err["Code"].should.equal("ExportTaskNotFoundFault") - err["Message"].should.equal( - "Cannot cancel export task because a task with the identifier export-snapshot-1 is not exist." + assert err["Code"] == "ExportTaskNotFoundFault" + assert err["Message"] == ( + "Cannot cancel export task because a task with the identifier " + "export-snapshot-1 is not exist." ) diff --git a/tests/test_rds/test_server.py b/tests/test_rds/test_server.py index e306eeb3a..9f6a0fbcd 100644 --- a/tests/test_rds/test_server.py +++ b/tests/test_rds/test_server.py @@ -1,6 +1,5 @@ from urllib.parse import urlencode import moto.server as server -import sure # noqa # pylint: disable=unused-import def test_list_databases(): @@ -9,7 +8,7 @@ def test_list_databases(): res = test_client.get("/?Action=DescribeDBInstances") - res.data.decode("utf-8").should.contain("") + assert "" in res.data.decode("utf-8") def test_create_db_instance(): @@ -28,10 +27,10 @@ def test_create_db_instance(): resp = test_client.post(f"/?{qs}") response = resp.data.decode("utf-8") - response.shouldnt.contain("") - response.should.contain("hi" not in response + assert "hifalse") - response.should.contain( + assert "false" in response + assert ( "false" - ) + ) in response diff --git a/tests/test_rds/test_utils.py b/tests/test_rds/test_utils.py index c496da601..0642eec3a 100644 --- a/tests/test_rds/test_utils.py +++ b/tests/test_rds/test_utils.py @@ -11,7 +11,7 @@ from moto.rds.utils import ( ) -class TestFilterValidation(object): +class TestFilterValidation: @classmethod def setup_class(cls): cls.filter_defs = { @@ -37,13 +37,13 @@ class TestFilterValidation(object): validate_filters(filters, self.filter_defs) -class Resource(object): +class Resource: def __init__(self, identifier, **kwargs): self.identifier = identifier self.__dict__.update(kwargs) -class TestResourceFiltering(object): +class TestResourceFiltering: @classmethod def setup_class(cls): cls.filter_defs = { @@ -64,31 +64,31 @@ class TestResourceFiltering(object): def test_filtering_on_nested_attribute(self): filters = {"nested-resource": ["nested-id-1"]} filtered_resources = apply_filter(self.resources, filters, self.filter_defs) - filtered_resources.should.have.length_of(1) - filtered_resources.should.have.key("identifier-3") + assert len(filtered_resources) == 1 + assert "identifier-3" in filtered_resources def test_filtering_on_common_attribute(self): filters = {"common-attr": ["common"]} filtered_resources = apply_filter(self.resources, filters, self.filter_defs) - filtered_resources.should.have.length_of(2) - filtered_resources.should.have.key("identifier-1") - filtered_resources.should.have.key("identifier-4") + assert len(filtered_resources) == 2 + assert "identifier-1" in filtered_resources + assert "identifier-4" in filtered_resources def test_filtering_on_multiple_attributes(self): filters = {"multiple-attrs": ["common"]} filtered_resources = apply_filter(self.resources, filters, self.filter_defs) - filtered_resources.should.have.length_of(3) - filtered_resources.should.have.key("identifier-1") - filtered_resources.should.have.key("identifier-4") - filtered_resources.should.have.key("identifier-5") + assert len(filtered_resources) == 3 + assert "identifier-1" in filtered_resources + assert "identifier-4" in filtered_resources + assert "identifier-5" in filtered_resources def test_filters_with_multiple_values(self): filters = {"identifier": ["identifier-0", "identifier-3", "identifier-5"]} filtered_resources = apply_filter(self.resources, filters, self.filter_defs) - filtered_resources.should.have.length_of(3) - filtered_resources.should.have.key("identifier-0") - filtered_resources.should.have.key("identifier-3") - filtered_resources.should.have.key("identifier-5") + assert len(filtered_resources) == 3 + assert "identifier-0" in filtered_resources + assert "identifier-3" in filtered_resources + assert "identifier-5" in filtered_resources def test_multiple_filters(self): filters = { @@ -97,11 +97,11 @@ class TestResourceFiltering(object): "multiple-attrs": ["common"], } filtered_resources = apply_filter(self.resources, filters, self.filter_defs) - filtered_resources.should.have.length_of(1) - filtered_resources.should.have.key("identifier-1") + assert len(filtered_resources) == 1 + assert "identifier-1" in filtered_resources -class TestMergingFilters(object): +class TestMergingFilters: def test_when_filters_to_update_is_none(self): filters_to_update = {"filter-name": ["value1"]} merged = merge_filters(filters_to_update, None) @@ -135,12 +135,15 @@ class TestMergingFilters(object): } merged = merge_filters(filters_to_update, filters_to_merge) assert len(merged.keys()) == 4 - for key in merged.keys(): - assert merged[key] == ["value1", "value2"] + for value in merged.values(): + assert value == ["value1", "value2"] def test_encode_orderable_db_instance(): - # Data from AWS comes in a specific format. Verify we can encode/decode it to something more compact + """Verify the data can be encoded/decoded to something more compact. + + Data from AWS comes in a specific format. + """ original = { "Engine": "neptune", "EngineVersion": "1.0.3.0", @@ -173,11 +176,15 @@ def test_encode_orderable_db_instance(): "Support edNetworkTypes": ["IPV4"], } short = encode_orderable_db_instance(original) - decode_orderable_db_instance(short).should.equal(original) + assert decode_orderable_db_instance(short) == original def test_encode_orderable_db_instance__short_format(): - # Verify this works in a random format. We don't know for sure what AWS returns, so it should always work regardless of the input + """Verify this works in a random format. + + We don't know for sure what AWS returns, so it should always work + regardless of the input. + """ short = { "Engine": "neptune", "EngineVersion": "1.0.3.0", @@ -190,12 +197,10 @@ def test_encode_orderable_db_instance__short_format(): "SupportsClusters": True, "SupportedNetworkTypes": ["IPV4"], } - decode_orderable_db_instance(encode_orderable_db_instance(short)).should.equal( - short - ) + assert decode_orderable_db_instance(encode_orderable_db_instance(short)) == short def test_verify_encoding_is_unique(): - len(set(ORDERABLE_DB_INSTANCE_ENCODING.values())).should.equal( - len(ORDERABLE_DB_INSTANCE_ENCODING.keys()) + assert len(set(ORDERABLE_DB_INSTANCE_ENCODING.values())) == len( + ORDERABLE_DB_INSTANCE_ENCODING.keys() )