Techdebt: Replace sure with regular assertions in RDS (#6683)

This commit is contained in:
kbalk 2023-08-17 03:42:19 -04:00 committed by GitHub
parent 28743bdbe7
commit 1fe69d55a5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 810 additions and 769 deletions

View File

@ -1,7 +1,7 @@
import boto3 import boto3
from botocore.exceptions import ClientError
import pytest import pytest
from botocore.exceptions import ClientError
from moto import mock_rds from moto import mock_rds
from moto.core import DEFAULT_ACCOUNT_ID from moto.core import DEFAULT_ACCOUNT_ID

View File

@ -1,7 +1,6 @@
import boto3 import boto3
import pytest
import sure # noqa # pylint: disable=unused-import
from botocore.exceptions import ClientError from botocore.exceptions import ClientError
import pytest
from moto import mock_rds from moto import mock_rds
@ -37,8 +36,8 @@ class TestDBInstanceFilters:
self.client.describe_db_instances( self.client.describe_db_instances(
Filters=[{"Name": "invalid-filter-name", "Values": []}] Filters=[{"Name": "invalid-filter-name", "Values": []}]
) )
ex.value.response["Error"]["Code"].should.equal("InvalidParameterValue") assert ex.value.response["Error"]["Code"] == "InvalidParameterValue"
ex.value.response["Error"]["Message"].should.equal( assert ex.value.response["Error"]["Message"] == (
"Unrecognized filter name: invalid-filter-name" "Unrecognized filter name: invalid-filter-name"
) )
@ -47,8 +46,8 @@ class TestDBInstanceFilters:
self.client.describe_db_instances( self.client.describe_db_instances(
Filters=[{"Name": "db-instance-id", "Values": []}] Filters=[{"Name": "db-instance-id", "Values": []}]
) )
ex.value.response["Error"]["Code"].should.equal("InvalidParameterCombination") assert ex.value.response["Error"]["Code"] == "InvalidParameterCombination"
ex.value.response["Error"]["Message"].should.contain("must not be empty") assert "must not be empty" in ex.value.response["Error"]["Message"]
def test_db_cluster_id_filter(self): def test_db_cluster_id_filter(self):
resp = self.client.describe_db_instances() resp = self.client.describe_db_instances()
@ -57,8 +56,8 @@ class TestDBInstanceFilters:
db_instances = self.client.describe_db_instances( db_instances = self.client.describe_db_instances(
Filters=[{"Name": "db-cluster-id", "Values": [db_cluster_identifier]}] Filters=[{"Name": "db-cluster-id", "Values": [db_cluster_identifier]}]
).get("DBInstances") ).get("DBInstances")
db_instances.should.have.length_of(1) assert len(db_instances) == 1
db_instances[0]["DBClusterIdentifier"].should.equal(db_cluster_identifier) assert db_instances[0]["DBClusterIdentifier"] == db_cluster_identifier
def test_db_instance_id_filter(self): def test_db_instance_id_filter(self):
resp = self.client.describe_db_instances() resp = self.client.describe_db_instances()
@ -67,8 +66,8 @@ class TestDBInstanceFilters:
db_instances = self.client.describe_db_instances( db_instances = self.client.describe_db_instances(
Filters=[{"Name": "db-instance-id", "Values": [db_instance_identifier]}] Filters=[{"Name": "db-instance-id", "Values": [db_instance_identifier]}]
).get("DBInstances") ).get("DBInstances")
db_instances.should.have.length_of(1) assert len(db_instances) == 1
db_instances[0]["DBInstanceIdentifier"].should.equal(db_instance_identifier) assert db_instances[0]["DBInstanceIdentifier"] == db_instance_identifier
def test_db_instance_id_filter_works_with_arns(self): def test_db_instance_id_filter_works_with_arns(self):
resp = self.client.describe_db_instances() resp = self.client.describe_db_instances()
@ -77,8 +76,8 @@ class TestDBInstanceFilters:
db_instances = self.client.describe_db_instances( db_instances = self.client.describe_db_instances(
Filters=[{"Name": "db-instance-id", "Values": [db_instance_arn]}] Filters=[{"Name": "db-instance-id", "Values": [db_instance_arn]}]
).get("DBInstances") ).get("DBInstances")
db_instances.should.have.length_of(1) assert len(db_instances) == 1
db_instances[0]["DBInstanceArn"].should.equal(db_instance_arn) assert db_instances[0]["DBInstanceArn"] == db_instance_arn
def test_dbi_resource_id_filter(self): def test_dbi_resource_id_filter(self):
resp = self.client.describe_db_instances() resp = self.client.describe_db_instances()
@ -88,19 +87,19 @@ class TestDBInstanceFilters:
Filters=[{"Name": "dbi-resource-id", "Values": [dbi_resource_identifier]}] Filters=[{"Name": "dbi-resource-id", "Values": [dbi_resource_identifier]}]
).get("DBInstances") ).get("DBInstances")
for db_instance in db_instances: for db_instance in db_instances:
db_instance["DbiResourceId"].should.equal(dbi_resource_identifier) assert db_instance["DbiResourceId"] == dbi_resource_identifier
def test_engine_filter(self): def test_engine_filter(self):
db_instances = self.client.describe_db_instances( db_instances = self.client.describe_db_instances(
Filters=[{"Name": "engine", "Values": ["postgres"]}] Filters=[{"Name": "engine", "Values": ["postgres"]}]
).get("DBInstances") ).get("DBInstances")
for db_instance in db_instances: for db_instance in db_instances:
db_instance["Engine"].should.equal("postgres") assert db_instance["Engine"] == "postgres"
db_instances = self.client.describe_db_instances( db_instances = self.client.describe_db_instances(
Filters=[{"Name": "engine", "Values": ["oracle"]}] Filters=[{"Name": "engine", "Values": ["oracle"]}]
).get("DBInstances") ).get("DBInstances")
db_instances.should.have.length_of(0) assert len(db_instances) == 0
def test_multiple_filters(self): def test_multiple_filters(self):
resp = self.client.describe_db_instances( resp = self.client.describe_db_instances(
@ -115,9 +114,9 @@ class TestDBInstanceFilters:
returned_identifiers = [ returned_identifiers = [
db["DBInstanceIdentifier"] for db in resp["DBInstances"] db["DBInstanceIdentifier"] for db in resp["DBInstances"]
] ]
returned_identifiers.should.have.length_of(2) assert len(returned_identifiers) == 2
"db-instance-0".should.be.within(returned_identifiers) assert "db-instance-0" in returned_identifiers
"db-instance-3".should.be.within(returned_identifiers) assert "db-instance-3" in returned_identifiers
def test_invalid_db_instance_identifier_with_exclusive_filter(self): def test_invalid_db_instance_identifier_with_exclusive_filter(self):
# Passing a non-existent DBInstanceIdentifier will not raise an error # Passing a non-existent DBInstanceIdentifier will not raise an error
@ -126,8 +125,8 @@ class TestDBInstanceFilters:
DBInstanceIdentifier="non-existent", DBInstanceIdentifier="non-existent",
Filters=[{"Name": "db-instance-id", "Values": ["db-instance-1"]}], Filters=[{"Name": "db-instance-id", "Values": ["db-instance-1"]}],
) )
resp["DBInstances"].should.have.length_of(1) assert len(resp["DBInstances"]) == 1
resp["DBInstances"][0]["DBInstanceIdentifier"].should.equal("db-instance-1") assert resp["DBInstances"][0]["DBInstanceIdentifier"] == "db-instance-1"
def test_invalid_db_instance_identifier_with_non_matching_filter(self): def test_invalid_db_instance_identifier_with_non_matching_filter(self):
# Passing a non-existent DBInstanceIdentifier will raise an error if # Passing a non-existent DBInstanceIdentifier will raise an error if
@ -137,8 +136,8 @@ class TestDBInstanceFilters:
DBInstanceIdentifier="non-existent", DBInstanceIdentifier="non-existent",
Filters=[{"Name": "engine", "Values": ["mysql"]}], Filters=[{"Name": "engine", "Values": ["mysql"]}],
) )
ex.value.response["Error"]["Code"].should.equal("DBInstanceNotFound") assert ex.value.response["Error"]["Code"] == "DBInstanceNotFound"
ex.value.response["Error"]["Message"].should.equal( assert ex.value.response["Error"]["Message"] == (
"DBInstance non-existent not found." "DBInstance non-existent not found."
) )
@ -155,8 +154,8 @@ class TestDBInstanceFilters:
returned_identifiers = [ returned_identifiers = [
db["DBInstanceIdentifier"] for db in resp["DBInstances"] db["DBInstanceIdentifier"] for db in resp["DBInstances"]
] ]
"db-instance-0".should_not.be.within(returned_identifiers) assert "db-instance-0" not in returned_identifiers
"db-instance-1".should.be.within(returned_identifiers) assert "db-instance-1" in returned_identifiers
def test_valid_db_instance_identifier_with_inclusive_filter(self): def test_valid_db_instance_identifier_with_inclusive_filter(self):
# Passing a valid DBInstanceIdentifier with a filter it matches but also # Passing a valid DBInstanceIdentifier with a filter it matches but also
@ -171,8 +170,8 @@ class TestDBInstanceFilters:
returned_identifiers = [ returned_identifiers = [
db["DBInstanceIdentifier"] for db in resp["DBInstances"] db["DBInstanceIdentifier"] for db in resp["DBInstances"]
] ]
"db-instance-0".should.be.within(returned_identifiers) assert "db-instance-0" in returned_identifiers
"db-instance-1".should.be.within(returned_identifiers) assert "db-instance-1" in returned_identifiers
def test_valid_db_instance_identifier_with_non_matching_filter(self): def test_valid_db_instance_identifier_with_non_matching_filter(self):
# Passing a valid DBInstanceIdentifier will raise an error if the # Passing a valid DBInstanceIdentifier will raise an error if the
@ -182,8 +181,8 @@ class TestDBInstanceFilters:
DBInstanceIdentifier="db-instance-0", DBInstanceIdentifier="db-instance-0",
Filters=[{"Name": "engine", "Values": ["postgres"]}], Filters=[{"Name": "engine", "Values": ["postgres"]}],
) )
ex.value.response["Error"]["Code"].should.equal("DBInstanceNotFound") assert ex.value.response["Error"]["Code"] == "DBInstanceNotFound"
ex.value.response["Error"]["Message"].should.equal( assert ex.value.response["Error"]["Message"] == (
"DBInstance db-instance-0 not found." "DBInstance db-instance-0 not found."
) )
@ -224,8 +223,8 @@ class TestDBSnapshotFilters:
self.client.describe_db_snapshots( self.client.describe_db_snapshots(
Filters=[{"Name": "invalid-filter-name", "Values": []}] Filters=[{"Name": "invalid-filter-name", "Values": []}]
) )
ex.value.response["Error"]["Code"].should.equal("InvalidParameterValue") assert ex.value.response["Error"]["Code"] == "InvalidParameterValue"
ex.value.response["Error"]["Message"].should.equal( assert ex.value.response["Error"]["Message"] == (
"Unrecognized filter name: invalid-filter-name" "Unrecognized filter name: invalid-filter-name"
) )
@ -234,15 +233,15 @@ class TestDBSnapshotFilters:
self.client.describe_db_snapshots( self.client.describe_db_snapshots(
Filters=[{"Name": "db-snapshot-id", "Values": []}] Filters=[{"Name": "db-snapshot-id", "Values": []}]
) )
ex.value.response["Error"]["Code"].should.equal("InvalidParameterCombination") assert ex.value.response["Error"]["Code"] == "InvalidParameterCombination"
ex.value.response["Error"]["Message"].should.contain("must not be empty") assert "must not be empty" in ex.value.response["Error"]["Message"]
def test_db_snapshot_id_filter(self): def test_db_snapshot_id_filter(self):
snapshots = self.client.describe_db_snapshots( snapshots = self.client.describe_db_snapshots(
Filters=[{"Name": "db-snapshot-id", "Values": ["db-instance-1-snapshot-0"]}] Filters=[{"Name": "db-snapshot-id", "Values": ["db-instance-1-snapshot-0"]}]
).get("DBSnapshots") ).get("DBSnapshots")
snapshots.should.have.length_of(1) assert len(snapshots) == 1
snapshots[0]["DBSnapshotIdentifier"].should.equal("db-instance-1-snapshot-0") assert snapshots[0]["DBSnapshotIdentifier"] == "db-instance-1-snapshot-0"
def test_db_instance_id_filter(self): def test_db_instance_id_filter(self):
resp = self.client.describe_db_instances() resp = self.client.describe_db_instances()
@ -252,7 +251,7 @@ class TestDBSnapshotFilters:
Filters=[{"Name": "db-instance-id", "Values": [db_instance_identifier]}] Filters=[{"Name": "db-instance-id", "Values": [db_instance_identifier]}]
).get("DBSnapshots") ).get("DBSnapshots")
for snapshot in snapshots: for snapshot in snapshots:
snapshot["DBInstanceIdentifier"].should.equal(db_instance_identifier) assert snapshot["DBInstanceIdentifier"] == db_instance_identifier
def test_db_instance_id_filter_works_with_arns(self): def test_db_instance_id_filter_works_with_arns(self):
resp = self.client.describe_db_instances() resp = self.client.describe_db_instances()
@ -263,7 +262,7 @@ class TestDBSnapshotFilters:
Filters=[{"Name": "db-instance-id", "Values": [db_instance_arn]}] Filters=[{"Name": "db-instance-id", "Values": [db_instance_arn]}]
).get("DBSnapshots") ).get("DBSnapshots")
for snapshot in snapshots: for snapshot in snapshots:
snapshot["DBInstanceIdentifier"].should.equal(db_instance_identifier) assert snapshot["DBInstanceIdentifier"] == db_instance_identifier
def test_dbi_resource_id_filter(self): def test_dbi_resource_id_filter(self):
resp = self.client.describe_db_instances() resp = self.client.describe_db_instances()
@ -273,19 +272,19 @@ class TestDBSnapshotFilters:
Filters=[{"Name": "dbi-resource-id", "Values": [dbi_resource_identifier]}] Filters=[{"Name": "dbi-resource-id", "Values": [dbi_resource_identifier]}]
).get("DBSnapshots") ).get("DBSnapshots")
for snapshot in snapshots: for snapshot in snapshots:
snapshot["DbiResourceId"].should.equal(dbi_resource_identifier) assert snapshot["DbiResourceId"] == dbi_resource_identifier
def test_engine_filter(self): def test_engine_filter(self):
snapshots = self.client.describe_db_snapshots( snapshots = self.client.describe_db_snapshots(
Filters=[{"Name": "engine", "Values": ["postgres"]}] Filters=[{"Name": "engine", "Values": ["postgres"]}]
).get("DBSnapshots") ).get("DBSnapshots")
for snapshot in snapshots: for snapshot in snapshots:
snapshot["Engine"].should.equal("postgres") assert snapshot["Engine"] == "postgres"
snapshots = self.client.describe_db_snapshots( snapshots = self.client.describe_db_snapshots(
Filters=[{"Name": "engine", "Values": ["oracle"]}] Filters=[{"Name": "engine", "Values": ["oracle"]}]
).get("DBSnapshots") ).get("DBSnapshots")
snapshots.should.have.length_of(0) assert len(snapshots) == 0
def test_snapshot_type_filter(self): def test_snapshot_type_filter(self):
snapshots = self.client.describe_db_snapshots( snapshots = self.client.describe_db_snapshots(
@ -310,8 +309,8 @@ class TestDBSnapshotFilters:
{"Name": "engine", "Values": ["mysql"]}, {"Name": "engine", "Values": ["mysql"]},
] ]
).get("DBSnapshots") ).get("DBSnapshots")
snapshots.should.have.length_of(1) assert len(snapshots) == 1
snapshots[0]["DBSnapshotIdentifier"].should.equal("db-instance-0-snapshot-1") assert snapshots[0]["DBSnapshotIdentifier"] == "db-instance-0-snapshot-1"
def test_invalid_snapshot_id_with_db_instance_id_and_filter(self): def test_invalid_snapshot_id_with_db_instance_id_and_filter(self):
# Passing a non-existent DBSnapshotIdentifier will return an empty list # Passing a non-existent DBSnapshotIdentifier will return an empty list
@ -321,7 +320,7 @@ class TestDBSnapshotFilters:
DBInstanceIdentifier="a-db-instance-identifier", DBInstanceIdentifier="a-db-instance-identifier",
Filters=[{"Name": "db-instance-id", "Values": ["db-instance-1"]}], Filters=[{"Name": "db-instance-id", "Values": ["db-instance-1"]}],
) )
resp["DBSnapshots"].should.have.length_of(0) assert len(resp["DBSnapshots"]) == 0
def test_invalid_snapshot_id_with_non_matching_filter(self): def test_invalid_snapshot_id_with_non_matching_filter(self):
# Passing a non-existent DBSnapshotIdentifier will raise an error if # Passing a non-existent DBSnapshotIdentifier will raise an error if
@ -331,8 +330,8 @@ class TestDBSnapshotFilters:
DBSnapshotIdentifier="non-existent", DBSnapshotIdentifier="non-existent",
Filters=[{"Name": "engine", "Values": ["oracle"]}], Filters=[{"Name": "engine", "Values": ["oracle"]}],
) )
ex.value.response["Error"]["Code"].should.equal("DBSnapshotNotFound") assert ex.value.response["Error"]["Code"] == "DBSnapshotNotFound"
ex.value.response["Error"]["Message"].should.equal( assert ex.value.response["Error"]["Message"] == (
"DBSnapshot non-existent not found." "DBSnapshot non-existent not found."
) )
@ -347,8 +346,8 @@ class TestDBSnapshotFilters:
{"Name": "engine", "Values": ["postgres"]}, {"Name": "engine", "Values": ["postgres"]},
], ],
) )
resp["DBSnapshots"].should.have.length_of(1) assert len(resp["DBSnapshots"]) == 1
resp["DBSnapshots"][0]["DBSnapshotIdentifier"].should.equal( assert resp["DBSnapshots"][0]["DBSnapshotIdentifier"] == (
"db-instance-1-snapshot-1" "db-instance-1-snapshot-1"
) )
@ -367,9 +366,9 @@ class TestDBSnapshotFilters:
], ],
).get("DBSnapshots") ).get("DBSnapshots")
returned_identifiers = [ss["DBSnapshotIdentifier"] for ss in snapshots] returned_identifiers = [ss["DBSnapshotIdentifier"] for ss in snapshots]
returned_identifiers.should.have.length_of(2) assert len(returned_identifiers) == 2
"db-instance-0-snapshot-0".should.be.within(returned_identifiers) assert "db-instance-0-snapshot-0" in returned_identifiers
"db-instance-1-snapshot-1".should.be.within(returned_identifiers) assert "db-instance-1-snapshot-1" in returned_identifiers
def test_valid_snapshot_id_with_non_matching_filter(self): def test_valid_snapshot_id_with_non_matching_filter(self):
# Passing a valid DBSnapshotIdentifier will raise an error if the # Passing a valid DBSnapshotIdentifier will raise an error if the
@ -379,8 +378,8 @@ class TestDBSnapshotFilters:
DBSnapshotIdentifier="db-instance-0-snapshot-0", DBSnapshotIdentifier="db-instance-0-snapshot-0",
Filters=[{"Name": "engine", "Values": ["postgres"]}], Filters=[{"Name": "engine", "Values": ["postgres"]}],
) )
ex.value.response["Error"]["Code"].should.equal("DBSnapshotNotFound") assert ex.value.response["Error"]["Code"] == "DBSnapshotNotFound"
ex.value.response["Error"]["Message"].should.equal( assert ex.value.response["Error"]["Message"] == (
"DBSnapshot db-instance-0-snapshot-0 not found." "DBSnapshot db-instance-0-snapshot-0 not found."
) )
@ -422,8 +421,8 @@ class TestDBClusterSnapshotFilters:
self.client.describe_db_cluster_snapshots( self.client.describe_db_cluster_snapshots(
Filters=[{"Name": "invalid-filter-name", "Values": []}] Filters=[{"Name": "invalid-filter-name", "Values": []}]
) )
ex.value.response["Error"]["Code"].should.equal("InvalidParameterValue") assert ex.value.response["Error"]["Code"] == "InvalidParameterValue"
ex.value.response["Error"]["Message"].should.equal( assert ex.value.response["Error"]["Message"] == (
"Unrecognized filter name: invalid-filter-name" "Unrecognized filter name: invalid-filter-name"
) )
@ -432,8 +431,8 @@ class TestDBClusterSnapshotFilters:
self.client.describe_db_cluster_snapshots( self.client.describe_db_cluster_snapshots(
Filters=[{"Name": "snapshot-type", "Values": []}] Filters=[{"Name": "snapshot-type", "Values": []}]
) )
ex.value.response["Error"]["Code"].should.equal("InvalidParameterCombination") assert ex.value.response["Error"]["Code"] == "InvalidParameterCombination"
ex.value.response["Error"]["Message"].should.contain("must not be empty") assert "must not be empty" in ex.value.response["Error"]["Message"]
def test_snapshot_type_filter(self): def test_snapshot_type_filter(self):
snapshots = self.client.describe_db_cluster_snapshots( snapshots = self.client.describe_db_cluster_snapshots(

View File

@ -1,7 +1,7 @@
import boto3 import boto3
from botocore.exceptions import ClientError
import pytest import pytest
from botocore.exceptions import ClientError
from moto import mock_rds from moto import mock_rds
from moto.core import DEFAULT_ACCOUNT_ID from moto.core import DEFAULT_ACCOUNT_ID
@ -65,7 +65,8 @@ def test_global_cluster_members():
@mock_rds @mock_rds
def test_create_global_cluster_from_regular_cluster(): def test_create_global_cluster_from_regular_cluster():
# WHEN create_db_cluster is called # WHEN create_db_cluster is called
# AND create_global_cluster is called with SourceDBClusterIdentifier set as the earlier created db cluster # AND create_global_cluster is called with SourceDBClusterIdentifier
# set as the earlier created db cluster
# THEN that db cluster is elevated to a global cluster # THEN that db cluster is elevated to a global cluster
# AND it still shows up when calling describe_db_clusters # AND it still shows up when calling describe_db_clusters
client = boto3.client("rds", "us-east-1") client = boto3.client("rds", "us-east-1")
@ -184,9 +185,10 @@ def test_create_global_cluster_from_regular_cluster__and_specify_engine():
) )
err = exc.value.response["Error"] err = exc.value.response["Error"]
assert err["Code"] == "InvalidParameterCombination" assert err["Code"] == "InvalidParameterCombination"
assert ( assert err["Message"] == (
err["Message"] "When creating global cluster from existing db cluster, value for "
== "When creating global cluster from existing db cluster, value for engineName should not be specified since it will be inherited from source cluster" "engineName should not be specified since it will be inherited "
"from source cluster"
) )
@ -195,11 +197,13 @@ def test_delete_non_global_cluster():
# WHEN a global cluster contains a regular cluster # WHEN a global cluster contains a regular cluster
# AND we attempt to delete the global cluster # AND we attempt to delete the global cluster
# THEN we get an error message # THEN we get an error message
# An error occurs (InvalidGlobalClusterStateFault) when calling the DeleteGlobalCluster operation: Global Cluster arn:aws:rds::486285699788:global-cluster:g1 is not empty # An error occurs (InvalidGlobalClusterStateFault) when calling the
# DeleteGlobalCluster operation: Global Cluster
# arn:aws:rds::486285699788:global-cluster:g1 is not empty
client = boto3.client("rds", "us-east-1") client = boto3.client("rds", "us-east-1")
client.create_global_cluster(GlobalClusterIdentifier="gc1", Engine="aurora-mysql") client.create_global_cluster(GlobalClusterIdentifier="gc1", Engine="aurora-mysql")
client.create_db_cluster( _ = client.create_db_cluster(
DBClusterIdentifier="dbci", DBClusterIdentifier="dbci",
GlobalClusterIdentifier="gc1", GlobalClusterIdentifier="gc1",
Engine="mysql", Engine="mysql",

File diff suppressed because it is too large Load Diff

View File

@ -1,6 +1,7 @@
import boto3
import json import json
import sure # noqa # pylint: disable=unused-import
import boto3
from moto import mock_cloudformation, mock_ec2, mock_rds from moto import mock_cloudformation, mock_ec2, mock_rds
from tests.test_cloudformation.fixtures import rds_mysql_with_db_parameter_group from tests.test_cloudformation.fixtures import rds_mysql_with_db_parameter_group
from tests.test_cloudformation.fixtures import rds_mysql_with_read_replica from tests.test_cloudformation.fixtures import rds_mysql_with_read_replica
@ -36,12 +37,12 @@ def test_create_subnetgroup_via_cf():
cf.create_stack(StackName="test_stack", TemplateBody=template_json) cf.create_stack(StackName="test_stack", TemplateBody=template_json)
response = rds.describe_db_subnet_groups()["DBSubnetGroups"] response = rds.describe_db_subnet_groups()["DBSubnetGroups"]
response.should.have.length_of(1) assert len(response) == 1
created_subnet = response[0] created_subnet = response[0]
created_subnet.should.have.key("DBSubnetGroupName").equal("subnetgroupname") assert created_subnet["DBSubnetGroupName"] == "subnetgroupname"
created_subnet.should.have.key("DBSubnetGroupDescription").equal("subnetgroupdesc") assert created_subnet["DBSubnetGroupDescription"] == "subnetgroupdesc"
created_subnet.should.have.key("VpcId").equal(vpc["VpcId"]) assert created_subnet["VpcId"] == vpc["VpcId"]
@mock_ec2 @mock_ec2
@ -82,19 +83,19 @@ def test_create_dbinstance_via_cf():
db_instance_identifier = summaries[0]["PhysicalResourceId"] db_instance_identifier = summaries[0]["PhysicalResourceId"]
resp = rds.describe_db_instances()["DBInstances"] resp = rds.describe_db_instances()["DBInstances"]
resp.should.have.length_of(1) assert len(resp) == 1
created = resp[0] created = resp[0]
created["DBInstanceIdentifier"].should.equal(db_instance_identifier) assert created["DBInstanceIdentifier"] == db_instance_identifier
created["Engine"].should.equal("mysql") assert created["Engine"] == "mysql"
created["DBInstanceStatus"].should.equal("available") assert created["DBInstanceStatus"] == "available"
# Verify the stack outputs are correct # Verify the stack outputs are correct
o = _get_stack_outputs(cf, stack_name="test_stack") o = _get_stack_outputs(cf, stack_name="test_stack")
o.should.have.key("db_address").equals( assert o["db_address"] == (
f"{db_instance_identifier}.aaaaaaaaaa.us-west-2.rds.amazonaws.com" f"{db_instance_identifier}.aaaaaaaaaa.us-west-2.rds.amazonaws.com"
) )
o.should.have.key("db_port").equals("3307") assert o["db_port"] == "3307"
@mock_ec2 @mock_ec2
@ -121,10 +122,10 @@ def test_create_dbsecuritygroup_via_cf():
cf.create_stack(StackName="test_stack", TemplateBody=template_json) cf.create_stack(StackName="test_stack", TemplateBody=template_json)
result = rds.describe_db_security_groups()["DBSecurityGroups"] result = rds.describe_db_security_groups()["DBSecurityGroups"]
result.should.have.length_of(1) assert len(result) == 1
created = result[0] created = result[0]
created["DBSecurityGroupDescription"].should.equal("my sec group") assert created["DBSecurityGroupDescription"] == "my sec group"
@mock_cloudformation @mock_cloudformation
@ -159,7 +160,7 @@ def test_rds_db_parameter_groups():
rds_conn = boto3.client("rds", region_name="us-west-1") rds_conn = boto3.client("rds", region_name="us-west-1")
db_parameter_groups = rds_conn.describe_db_parameter_groups() db_parameter_groups = rds_conn.describe_db_parameter_groups()
db_parameter_groups["DBParameterGroups"].should.have.length_of(1) assert len(db_parameter_groups["DBParameterGroups"]) == 1
db_parameter_group_name = db_parameter_groups["DBParameterGroups"][0][ db_parameter_group_name = db_parameter_groups["DBParameterGroups"][0][
"DBParameterGroupName" "DBParameterGroupName"
] ]
@ -174,7 +175,7 @@ def test_rds_db_parameter_groups():
): ):
found_cloudformation_set_parameter = True found_cloudformation_set_parameter = True
found_cloudformation_set_parameter.should.equal(True) assert found_cloudformation_set_parameter is True
@mock_cloudformation @mock_cloudformation
@ -260,7 +261,7 @@ def test_rds_mysql_with_read_replica_in_vpc():
subnet_group = rds.describe_db_subnet_groups(DBSubnetGroupName=subnet_group_name)[ subnet_group = rds.describe_db_subnet_groups(DBSubnetGroupName=subnet_group_name)[
"DBSubnetGroups" "DBSubnetGroups"
][0] ][0]
subnet_group.should.have.key("DBSubnetGroupDescription").equal("my db subnet group") assert subnet_group["DBSubnetGroupDescription"] == "my db subnet group"
@mock_ec2 @mock_ec2
@ -292,12 +293,12 @@ def test_delete_dbinstance_via_cf():
cf.create_stack(StackName="test_stack", TemplateBody=template_json) cf.create_stack(StackName="test_stack", TemplateBody=template_json)
resp = rds.describe_db_instances()["DBInstances"] resp = rds.describe_db_instances()["DBInstances"]
resp.should.have.length_of(1) assert len(resp) == 1
cf.delete_stack(StackName="test_stack") cf.delete_stack(StackName="test_stack")
resp = rds.describe_db_instances()["DBInstances"] resp = rds.describe_db_instances()["DBInstances"]
resp.should.have.length_of(0) assert len(resp) == 0
def _get_stack_outputs(cf_client, stack_name): def _get_stack_outputs(cf_client, stack_name):

View File

@ -1,8 +1,9 @@
import boto3 import re
import pytest
import sure # noqa # pylint: disable=unused-import
import boto3
from botocore.exceptions import ClientError from botocore.exceptions import ClientError
import pytest
from moto import mock_rds from moto import mock_rds
from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID
@ -12,7 +13,7 @@ def test_describe_db_cluster_initial():
client = boto3.client("rds", region_name="eu-north-1") client = boto3.client("rds", region_name="eu-north-1")
resp = client.describe_db_clusters() resp = client.describe_db_clusters()
resp.should.have.key("DBClusters").should.have.length_of(0) assert len(resp["DBClusters"]) == 0
@mock_rds @mock_rds
@ -20,12 +21,12 @@ def test_describe_db_cluster_fails_for_non_existent_cluster():
client = boto3.client("rds", region_name="eu-north-1") client = boto3.client("rds", region_name="eu-north-1")
resp = client.describe_db_clusters() resp = client.describe_db_clusters()
resp.should.have.key("DBClusters").should.have.length_of(0) assert len(resp["DBClusters"]) == 0
with pytest.raises(ClientError) as ex: with pytest.raises(ClientError) as ex:
client.describe_db_clusters(DBClusterIdentifier="cluster-id") client.describe_db_clusters(DBClusterIdentifier="cluster-id")
err = ex.value.response["Error"] err = ex.value.response["Error"]
err["Code"].should.equal("DBClusterNotFoundFault") assert err["Code"] == "DBClusterNotFoundFault"
err["Message"].should.equal("DBCluster cluster-id not found.") assert err["Message"] == "DBCluster cluster-id not found."
@mock_rds @mock_rds
@ -35,8 +36,8 @@ def test_create_db_cluster_needs_master_username():
with pytest.raises(ClientError) as ex: with pytest.raises(ClientError) as ex:
client.create_db_cluster(DBClusterIdentifier="cluster-id", Engine="aurora") client.create_db_cluster(DBClusterIdentifier="cluster-id", Engine="aurora")
err = ex.value.response["Error"] err = ex.value.response["Error"]
err["Code"].should.equal("InvalidParameterValue") assert err["Code"] == "InvalidParameterValue"
err["Message"].should.equal( assert err["Message"] == (
"The parameter MasterUsername must be provided and must not be blank." "The parameter MasterUsername must be provided and must not be blank."
) )
@ -50,8 +51,8 @@ def test_create_db_cluster_needs_master_user_password():
DBClusterIdentifier="cluster-id", Engine="aurora", MasterUsername="root" DBClusterIdentifier="cluster-id", Engine="aurora", MasterUsername="root"
) )
err = ex.value.response["Error"] err = ex.value.response["Error"]
err["Code"].should.equal("InvalidParameterValue") assert err["Code"] == "InvalidParameterValue"
err["Message"].should.equal( assert err["Message"] == (
"The parameter MasterUserPassword must be provided and must not be blank." "The parameter MasterUserPassword must be provided and must not be blank."
) )
@ -68,9 +69,10 @@ def test_create_db_cluster_needs_long_master_user_password():
MasterUserPassword="hunter2", MasterUserPassword="hunter2",
) )
err = ex.value.response["Error"] err = ex.value.response["Error"]
err["Code"].should.equal("InvalidParameterValue") assert err["Code"] == "InvalidParameterValue"
err["Message"].should.equal( assert err["Message"] == (
"The parameter MasterUserPassword is not a valid password because it is shorter than 8 characters." "The parameter MasterUserPassword is not a valid password because "
"it is shorter than 8 characters."
) )
@ -91,9 +93,10 @@ def test_modify_db_cluster_needs_long_master_user_password():
MasterUserPassword="hunter2", MasterUserPassword="hunter2",
) )
err = ex.value.response["Error"] err = ex.value.response["Error"]
err["Code"].should.equal("InvalidParameterValue") assert err["Code"] == "InvalidParameterValue"
err["Message"].should.equal( assert err["Message"] == (
"The parameter MasterUserPassword is not a valid password because it is shorter than 8 characters." "The parameter MasterUserPassword is not a valid password because "
"it is shorter than 8 characters."
) )
@ -116,7 +119,7 @@ def test_modify_db_cluster_new_cluster_identifier():
MasterUserPassword="hunter21", MasterUserPassword="hunter21",
) )
resp["DBCluster"].should.have.key("DBClusterIdentifier").equal(new_id) assert resp["DBCluster"]["DBClusterIdentifier"] == new_id
clusters = [ clusters = [
cluster["DBClusterIdentifier"] cluster["DBClusterIdentifier"]
@ -136,61 +139,61 @@ def test_create_db_cluster__verify_default_properties():
MasterUsername="root", MasterUsername="root",
MasterUserPassword="hunter2_", MasterUserPassword="hunter2_",
) )
resp.should.have.key("DBCluster") assert "DBCluster" in resp
cluster = resp["DBCluster"] cluster = resp["DBCluster"]
cluster.shouldnt.have.key( # This was not supplied, so should not be returned
"DatabaseName" assert "DatabaseName" not in cluster
) # This was not supplied, so should not be returned
cluster.should.have.key("AvailabilityZones") assert "AvailabilityZones" in cluster
set(cluster["AvailabilityZones"]).should.equal( assert set(cluster["AvailabilityZones"]) == {
{"eu-north-1a", "eu-north-1b", "eu-north-1c"} "eu-north-1a",
) "eu-north-1b",
cluster.should.have.key("BackupRetentionPeriod").equal(1) "eu-north-1c",
cluster.should.have.key("DBClusterIdentifier").equal("cluster-id") }
cluster.should.have.key("DBClusterParameterGroup").equal("default.aurora8.0") assert cluster["BackupRetentionPeriod"] == 1
cluster.should.have.key("DBSubnetGroup").equal("default") assert cluster["DBClusterIdentifier"] == "cluster-id"
cluster.should.have.key("Status").equal("creating") assert cluster["DBClusterParameterGroup"] == "default.aurora8.0"
cluster.should.have.key("Endpoint").match( assert cluster["DBSubnetGroup"] == "default"
"cluster-id.cluster-[a-z0-9]{12}.eu-north-1.rds.amazonaws.com" assert cluster["Status"] == "creating"
assert re.match(
"cluster-id.cluster-[a-z0-9]{12}.eu-north-1.rds.amazonaws.com",
cluster["Endpoint"],
) )
endpoint = cluster["Endpoint"] endpoint = cluster["Endpoint"]
expected_readonly = endpoint.replace( expected_readonly = endpoint.replace(
"cluster-id.cluster-", "cluster-id.cluster-ro-" "cluster-id.cluster-", "cluster-id.cluster-ro-"
) )
cluster.should.have.key("ReaderEndpoint").equal(expected_readonly) assert cluster["ReaderEndpoint"] == expected_readonly
cluster.should.have.key("MultiAZ").equal(False) assert cluster["MultiAZ"] is False
cluster.should.have.key("Engine").equal("aurora") assert cluster["Engine"] == "aurora"
cluster.should.have.key("EngineVersion").equal("5.6.mysql_aurora.1.22.5") assert cluster["EngineVersion"] == "5.6.mysql_aurora.1.22.5"
cluster.should.have.key("Port").equal(3306) assert cluster["Port"] == 3306
cluster.should.have.key("MasterUsername").equal("root") assert cluster["MasterUsername"] == "root"
cluster.should.have.key("PreferredBackupWindow").equal("01:37-02:07") assert cluster["PreferredBackupWindow"] == "01:37-02:07"
cluster.should.have.key("PreferredMaintenanceWindow").equal("wed:02:40-wed:03:10") assert cluster["PreferredMaintenanceWindow"] == "wed:02:40-wed:03:10"
cluster.should.have.key("ReadReplicaIdentifiers").equal([]) assert cluster["ReadReplicaIdentifiers"] == []
cluster.should.have.key("DBClusterMembers").equal([]) assert cluster["DBClusterMembers"] == []
cluster.should.have.key("VpcSecurityGroups") assert "VpcSecurityGroups" in cluster
cluster.should.have.key("HostedZoneId") assert "HostedZoneId" in cluster
cluster.should.have.key("StorageEncrypted").equal(False) assert cluster["StorageEncrypted"] is False
cluster.should.have.key("DbClusterResourceId").match(r"cluster-[A-Z0-9]{26}") assert re.match(r"cluster-[A-Z0-9]{26}", cluster["DbClusterResourceId"])
cluster.should.have.key("DBClusterArn").equal( assert cluster["DBClusterArn"] == (
f"arn:aws:rds:eu-north-1:{ACCOUNT_ID}:cluster:cluster-id" f"arn:aws:rds:eu-north-1:{ACCOUNT_ID}:cluster:cluster-id"
) )
cluster.should.have.key("AssociatedRoles").equal([]) assert cluster["AssociatedRoles"] == []
cluster.should.have.key("IAMDatabaseAuthenticationEnabled").equal(False) assert cluster["IAMDatabaseAuthenticationEnabled"] is False
cluster.should.have.key("EngineMode").equal("provisioned") assert cluster["EngineMode"] == "provisioned"
cluster.should.have.key("DeletionProtection").equal(False) assert cluster["DeletionProtection"] is False
cluster.should.have.key("HttpEndpointEnabled").equal(False) assert cluster["HttpEndpointEnabled"] is False
cluster.should.have.key("CopyTagsToSnapshot").equal(False) assert cluster["CopyTagsToSnapshot"] is False
cluster.should.have.key("CrossAccountClone").equal(False) assert cluster["CrossAccountClone"] is False
cluster.should.have.key("DeletionProtection").equal(False) assert cluster["DeletionProtection"] is False
cluster.should.have.key("DomainMemberships").equal([]) assert cluster["DomainMemberships"] == []
cluster.should.have.key("TagList").equal([]) assert cluster["TagList"] == []
cluster.should.have.key("ClusterCreateTime") assert "ClusterCreateTime" in cluster
cluster.should.have.key( assert cluster["EarliestRestorableTime"] >= cluster["ClusterCreateTime"]
"EarliestRestorableTime"
).should.be.greater_than_or_equal_to(cluster["ClusterCreateTime"])
@mock_rds @mock_rds
@ -221,14 +224,14 @@ def test_create_db_cluster_additional_parameters():
cluster = resp["DBCluster"] cluster = resp["DBCluster"]
cluster.should.have.key("AvailabilityZones").equal(["eu-north-1b"]) assert cluster["AvailabilityZones"] == ["eu-north-1b"]
cluster.should.have.key("DatabaseName").equal("users") assert cluster["DatabaseName"] == "users"
cluster.should.have.key("Engine").equal("aurora") assert cluster["Engine"] == "aurora"
cluster.should.have.key("EngineVersion").equal("8.0.mysql_aurora.3.01.0") assert cluster["EngineVersion"] == "8.0.mysql_aurora.3.01.0"
cluster.should.have.key("EngineMode").equal("serverless") assert cluster["EngineMode"] == "serverless"
cluster.should.have.key("Port").equal(1234) assert cluster["Port"] == 1234
cluster.should.have.key("DeletionProtection").equal(True) assert cluster["DeletionProtection"] is True
cluster.should.have.key("EnabledCloudwatchLogsExports").equals(["audit"]) assert cluster["EnabledCloudwatchLogsExports"] == ["audit"]
assert cluster["KmsKeyId"] == "some:kms:arn" assert cluster["KmsKeyId"] == "some:kms:arn"
assert cluster["NetworkType"] == "IPV4" assert cluster["NetworkType"] == "IPV4"
assert cluster["DBSubnetGroup"] == "subnetgroupname" assert cluster["DBSubnetGroup"] == "subnetgroupname"
@ -258,15 +261,19 @@ def test_describe_db_cluster_after_creation():
MasterUserPassword="hunter2_", MasterUserPassword="hunter2_",
)["DBCluster"]["DBClusterArn"] )["DBCluster"]["DBClusterArn"]
client.describe_db_clusters()["DBClusters"].should.have.length_of(2) assert len(client.describe_db_clusters()["DBClusters"]) == 2
client.describe_db_clusters(DBClusterIdentifier="cluster-id2")[ assert (
"DBClusters" len(
].should.have.length_of(1) client.describe_db_clusters(DBClusterIdentifier="cluster-id2")["DBClusters"]
)
== 1
)
client.describe_db_clusters(DBClusterIdentifier=cluster_arn)[ assert (
"DBClusters" len(client.describe_db_clusters(DBClusterIdentifier=cluster_arn)["DBClusters"])
].should.have.length_of(1) == 1
)
@mock_rds @mock_rds
@ -282,7 +289,7 @@ def test_delete_db_cluster():
client.delete_db_cluster(DBClusterIdentifier="cluster-id") client.delete_db_cluster(DBClusterIdentifier="cluster-id")
client.describe_db_clusters()["DBClusters"].should.have.length_of(0) assert len(client.describe_db_clusters()["DBClusters"]) == 0
@mock_rds @mock_rds
@ -299,7 +306,7 @@ def test_delete_db_cluster_do_snapshot():
client.delete_db_cluster( client.delete_db_cluster(
DBClusterIdentifier="cluster-id", FinalDBSnapshotIdentifier="final-snapshot" DBClusterIdentifier="cluster-id", FinalDBSnapshotIdentifier="final-snapshot"
) )
client.describe_db_clusters()["DBClusters"].should.have.length_of(0) assert len(client.describe_db_clusters()["DBClusters"]) == 0
snapshot = client.describe_db_cluster_snapshots()["DBClusterSnapshots"][0] snapshot = client.describe_db_cluster_snapshots()["DBClusterSnapshots"][0]
assert snapshot["DBClusterIdentifier"] == "cluster-id" assert snapshot["DBClusterIdentifier"] == "cluster-id"
assert snapshot["DBClusterSnapshotIdentifier"] == "final-snapshot" assert snapshot["DBClusterSnapshotIdentifier"] == "final-snapshot"
@ -321,7 +328,7 @@ def test_delete_db_cluster_that_is_protected():
with pytest.raises(ClientError) as exc: with pytest.raises(ClientError) as exc:
client.delete_db_cluster(DBClusterIdentifier="cluster-id") client.delete_db_cluster(DBClusterIdentifier="cluster-id")
err = exc.value.response["Error"] err = exc.value.response["Error"]
err["Message"].should.equal("Can't delete Cluster with protection enabled") assert err["Message"] == "Can't delete Cluster with protection enabled"
@mock_rds @mock_rds
@ -331,8 +338,8 @@ def test_delete_db_cluster_unknown_cluster():
with pytest.raises(ClientError) as ex: with pytest.raises(ClientError) as ex:
client.delete_db_cluster(DBClusterIdentifier="cluster-unknown") client.delete_db_cluster(DBClusterIdentifier="cluster-unknown")
err = ex.value.response["Error"] err = ex.value.response["Error"]
err["Code"].should.equal("DBClusterNotFoundFault") assert err["Code"] == "DBClusterNotFoundFault"
err["Message"].should.equal("DBCluster cluster-unknown not found.") assert err["Message"] == "DBCluster cluster-unknown not found."
@mock_rds @mock_rds
@ -342,8 +349,8 @@ def test_start_db_cluster_unknown_cluster():
with pytest.raises(ClientError) as ex: with pytest.raises(ClientError) as ex:
client.start_db_cluster(DBClusterIdentifier="cluster-unknown") client.start_db_cluster(DBClusterIdentifier="cluster-unknown")
err = ex.value.response["Error"] err = ex.value.response["Error"]
err["Code"].should.equal("DBClusterNotFoundFault") assert err["Code"] == "DBClusterNotFoundFault"
err["Message"].should.equal("DBCluster cluster-unknown not found.") assert err["Message"] == "DBCluster cluster-unknown not found."
@mock_rds @mock_rds
@ -360,7 +367,7 @@ def test_start_db_cluster_after_stopping():
client.start_db_cluster(DBClusterIdentifier="cluster-id") client.start_db_cluster(DBClusterIdentifier="cluster-id")
cluster = client.describe_db_clusters()["DBClusters"][0] cluster = client.describe_db_clusters()["DBClusters"][0]
cluster["Status"].should.equal("available") assert cluster["Status"] == "available"
@mock_rds @mock_rds
@ -377,8 +384,8 @@ def test_start_db_cluster_without_stopping():
with pytest.raises(ClientError) as ex: with pytest.raises(ClientError) as ex:
client.start_db_cluster(DBClusterIdentifier="cluster-id") client.start_db_cluster(DBClusterIdentifier="cluster-id")
err = ex.value.response["Error"] err = ex.value.response["Error"]
err["Code"].should.equal("InvalidDBClusterStateFault") assert err["Code"] == "InvalidDBClusterStateFault"
err["Message"].should.equal("DbCluster cluster-id is not in stopped state.") assert err["Message"] == "DbCluster cluster-id is not in stopped state."
@mock_rds @mock_rds
@ -395,11 +402,11 @@ def test_stop_db_cluster():
resp = client.stop_db_cluster(DBClusterIdentifier="cluster-id") resp = client.stop_db_cluster(DBClusterIdentifier="cluster-id")
# Quirk of the AWS implementation - the immediate response show it's still available # Quirk of the AWS implementation - the immediate response show it's still available
cluster = resp["DBCluster"] cluster = resp["DBCluster"]
cluster["Status"].should.equal("available") assert cluster["Status"] == "available"
# For some time the status will be 'stopping' # For some time the status will be 'stopping'
# And finally it will be 'stopped' # And finally it will be 'stopped'
cluster = client.describe_db_clusters()["DBClusters"][0] cluster = client.describe_db_clusters()["DBClusters"][0]
cluster["Status"].should.equal("stopped") assert cluster["Status"] == "stopped"
@mock_rds @mock_rds
@ -418,8 +425,8 @@ def test_stop_db_cluster_already_stopped():
with pytest.raises(ClientError) as ex: with pytest.raises(ClientError) as ex:
client.stop_db_cluster(DBClusterIdentifier="cluster-id") client.stop_db_cluster(DBClusterIdentifier="cluster-id")
err = ex.value.response["Error"] err = ex.value.response["Error"]
err["Code"].should.equal("InvalidDBClusterStateFault") assert err["Code"] == "InvalidDBClusterStateFault"
err["Message"].should.equal("DbCluster cluster-id is not in available state.") assert err["Message"] == "DbCluster cluster-id is not in available state."
@mock_rds @mock_rds
@ -429,8 +436,8 @@ def test_stop_db_cluster_unknown_cluster():
with pytest.raises(ClientError) as ex: with pytest.raises(ClientError) as ex:
client.stop_db_cluster(DBClusterIdentifier="cluster-unknown") client.stop_db_cluster(DBClusterIdentifier="cluster-unknown")
err = ex.value.response["Error"] err = ex.value.response["Error"]
err["Code"].should.equal("DBClusterNotFoundFault") assert err["Code"] == "DBClusterNotFoundFault"
err["Message"].should.equal("DBCluster cluster-unknown not found.") assert err["Message"] == "DBCluster cluster-unknown not found."
@mock_rds @mock_rds
@ -441,7 +448,7 @@ def test_create_db_cluster_snapshot_fails_for_unknown_cluster():
DBClusterIdentifier="db-primary-1", DBClusterSnapshotIdentifier="snapshot-1" DBClusterIdentifier="db-primary-1", DBClusterSnapshotIdentifier="snapshot-1"
) )
err = exc.value.response["Error"] err = exc.value.response["Error"]
err["Message"].should.equal("DBCluster db-primary-1 not found.") assert err["Message"] == "DBCluster db-primary-1 not found."
@mock_rds @mock_rds
@ -467,7 +474,7 @@ def test_create_db_cluster_snapshot():
assert snapshot["DBClusterSnapshotIdentifier"] == "g-1" assert snapshot["DBClusterSnapshotIdentifier"] == "g-1"
assert snapshot["SnapshotType"] == "manual" assert snapshot["SnapshotType"] == "manual"
result = conn.list_tags_for_resource(ResourceName=snapshot["DBClusterSnapshotArn"]) result = conn.list_tags_for_resource(ResourceName=snapshot["DBClusterSnapshotArn"])
result["TagList"].should.equal([]) assert result["TagList"] == []
@mock_rds @mock_rds
@ -491,14 +498,15 @@ def test_create_db_cluster_snapshot_copy_tags():
DBClusterIdentifier="db-primary-1", DBClusterSnapshotIdentifier="g-1" DBClusterIdentifier="db-primary-1", DBClusterSnapshotIdentifier="g-1"
).get("DBClusterSnapshot") ).get("DBClusterSnapshot")
snapshot.get("Engine").should.equal("postgres") assert snapshot.get("Engine") == "postgres"
snapshot.get("DBClusterIdentifier").should.equal("db-primary-1") assert snapshot.get("DBClusterIdentifier") == "db-primary-1"
snapshot.get("DBClusterSnapshotIdentifier").should.equal("g-1") assert snapshot.get("DBClusterSnapshotIdentifier") == "g-1"
result = conn.list_tags_for_resource(ResourceName=snapshot["DBClusterSnapshotArn"]) result = conn.list_tags_for_resource(ResourceName=snapshot["DBClusterSnapshotArn"])
result["TagList"].should.equal( assert result["TagList"] == [
[{"Value": "bar", "Key": "foo"}, {"Value": "bar1", "Key": "foo1"}] {"Value": "bar", "Key": "foo"},
) {"Value": "bar1", "Key": "foo1"},
]
@mock_rds @mock_rds
@ -512,7 +520,7 @@ def test_copy_db_cluster_snapshot_fails_for_unknown_snapshot():
) )
err = exc.value.response["Error"] err = exc.value.response["Error"]
err["Message"].should.equal("DBClusterSnapshot snapshot-1 not found.") assert err["Message"] == "DBClusterSnapshot snapshot-1 not found."
@mock_rds @mock_rds
@ -539,13 +547,13 @@ def test_copy_db_cluster_snapshot():
TargetDBClusterSnapshotIdentifier="snapshot-2", TargetDBClusterSnapshotIdentifier="snapshot-2",
).get("DBClusterSnapshot") ).get("DBClusterSnapshot")
target_snapshot.get("Engine").should.equal("postgres") assert target_snapshot.get("Engine") == "postgres"
target_snapshot.get("DBClusterIdentifier").should.equal("db-primary-1") assert target_snapshot.get("DBClusterIdentifier") == "db-primary-1"
target_snapshot.get("DBClusterSnapshotIdentifier").should.equal("snapshot-2") assert target_snapshot.get("DBClusterSnapshotIdentifier") == "snapshot-2"
result = conn.list_tags_for_resource( result = conn.list_tags_for_resource(
ResourceName=target_snapshot["DBClusterSnapshotArn"] ResourceName=target_snapshot["DBClusterSnapshotArn"]
) )
result["TagList"].should.equal([]) assert result["TagList"] == []
@mock_rds @mock_rds
@ -578,8 +586,9 @@ def test_copy_db_cluster_snapshot_fails_for_existed_target_snapshot():
) )
err = exc.value.response["Error"] err = exc.value.response["Error"]
err["Message"].should.equal( assert err["Message"] == (
"Cannot create the snapshot because a snapshot with the identifier snapshot-2 already exists." "Cannot create the snapshot because a snapshot with the identifier "
"snapshot-2 already exists."
) )
@ -601,7 +610,7 @@ def test_describe_db_cluster_snapshots():
DBClusterIdentifier="db-primary-1", DBClusterSnapshotIdentifier="snapshot-1" DBClusterIdentifier="db-primary-1", DBClusterSnapshotIdentifier="snapshot-1"
).get("DBClusterSnapshot") ).get("DBClusterSnapshot")
created.get("Engine").should.equal("postgres") assert created.get("Engine") == "postgres"
by_database_id = conn.describe_db_cluster_snapshots( by_database_id = conn.describe_db_cluster_snapshots(
DBClusterIdentifier="db-primary-1" DBClusterIdentifier="db-primary-1"
@ -609,11 +618,11 @@ def test_describe_db_cluster_snapshots():
by_snapshot_id = conn.describe_db_cluster_snapshots( by_snapshot_id = conn.describe_db_cluster_snapshots(
DBClusterSnapshotIdentifier="snapshot-1" DBClusterSnapshotIdentifier="snapshot-1"
).get("DBClusterSnapshots") ).get("DBClusterSnapshots")
by_snapshot_id.should.equal(by_database_id) assert by_snapshot_id == by_database_id
snapshot = by_snapshot_id[0] snapshot = by_snapshot_id[0]
snapshot.should.equal(created) assert snapshot == created
snapshot.get("Engine").should.equal("postgres") assert snapshot.get("Engine") == "postgres"
conn.create_db_cluster_snapshot( conn.create_db_cluster_snapshot(
DBClusterIdentifier="db-primary-1", DBClusterSnapshotIdentifier="snapshot-2" DBClusterIdentifier="db-primary-1", DBClusterSnapshotIdentifier="snapshot-2"
@ -621,7 +630,7 @@ def test_describe_db_cluster_snapshots():
snapshots = conn.describe_db_cluster_snapshots( snapshots = conn.describe_db_cluster_snapshots(
DBClusterIdentifier="db-primary-1" DBClusterIdentifier="db-primary-1"
).get("DBClusterSnapshots") ).get("DBClusterSnapshots")
snapshots.should.have.length_of(2) assert len(snapshots) == 2
@mock_rds @mock_rds
@ -643,9 +652,8 @@ def test_delete_db_cluster_snapshot():
conn.describe_db_cluster_snapshots(DBClusterSnapshotIdentifier="snapshot-1") conn.describe_db_cluster_snapshots(DBClusterSnapshotIdentifier="snapshot-1")
conn.delete_db_cluster_snapshot(DBClusterSnapshotIdentifier="snapshot-1") conn.delete_db_cluster_snapshot(DBClusterSnapshotIdentifier="snapshot-1")
conn.describe_db_cluster_snapshots.when.called_with( with pytest.raises(ClientError):
DBClusterSnapshotIdentifier="snapshot-1" conn.describe_db_cluster_snapshots(DBClusterSnapshotIdentifier="snapshot-1")
).should.throw(ClientError)
@mock_rds @mock_rds
@ -661,7 +669,7 @@ def test_restore_db_cluster_from_snapshot():
MasterUserPassword="hunter2000", MasterUserPassword="hunter2000",
Port=1234, Port=1234,
) )
conn.describe_db_clusters()["DBClusters"].should.have.length_of(1) assert len(conn.describe_db_clusters()["DBClusters"]) == 1
conn.create_db_cluster_snapshot( conn.create_db_cluster_snapshot(
DBClusterIdentifier="db-primary-1", DBClusterSnapshotIdentifier="snapshot-1" DBClusterIdentifier="db-primary-1", DBClusterSnapshotIdentifier="snapshot-1"
@ -673,17 +681,18 @@ def test_restore_db_cluster_from_snapshot():
SnapshotIdentifier="snapshot-1", SnapshotIdentifier="snapshot-1",
Engine="postgres", Engine="postgres",
)["DBCluster"] )["DBCluster"]
new_cluster["DBClusterIdentifier"].should.equal("db-restore-1") assert new_cluster["DBClusterIdentifier"] == "db-restore-1"
new_cluster["DBClusterInstanceClass"].should.equal("db.m1.small") assert new_cluster["DBClusterInstanceClass"] == "db.m1.small"
new_cluster["Engine"].should.equal("postgres") assert new_cluster["Engine"] == "postgres"
new_cluster["DatabaseName"].should.equal("staging-postgres") assert new_cluster["DatabaseName"] == "staging-postgres"
new_cluster["Port"].should.equal(1234) assert new_cluster["Port"] == 1234
# Verify it exists # Verify it exists
conn.describe_db_clusters()["DBClusters"].should.have.length_of(2) assert len(conn.describe_db_clusters()["DBClusters"]) == 2
conn.describe_db_clusters(DBClusterIdentifier="db-restore-1")[ assert (
"DBClusters" len(conn.describe_db_clusters(DBClusterIdentifier="db-restore-1")["DBClusters"])
].should.have.length_of(1) == 1
)
@mock_rds @mock_rds
@ -699,7 +708,7 @@ def test_restore_db_cluster_from_snapshot_and_override_params():
MasterUserPassword="hunter2000", MasterUserPassword="hunter2000",
Port=1234, Port=1234,
) )
conn.describe_db_clusters()["DBClusters"].should.have.length_of(1) assert len(conn.describe_db_clusters()["DBClusters"]) == 1
conn.create_db_cluster_snapshot( conn.create_db_cluster_snapshot(
DBClusterIdentifier="db-primary-1", DBClusterSnapshotIdentifier="snapshot-1" DBClusterIdentifier="db-primary-1", DBClusterSnapshotIdentifier="snapshot-1"
) )
@ -712,10 +721,10 @@ def test_restore_db_cluster_from_snapshot_and_override_params():
Port=10000, Port=10000,
DBClusterInstanceClass="db.r6g.xlarge", DBClusterInstanceClass="db.r6g.xlarge",
)["DBCluster"] )["DBCluster"]
new_cluster["DBClusterIdentifier"].should.equal("db-restore-1") assert new_cluster["DBClusterIdentifier"] == "db-restore-1"
new_cluster["DBClusterParameterGroup"].should.equal("default.aurora8.0") assert new_cluster["DBClusterParameterGroup"] == "default.aurora8.0"
new_cluster["DBClusterInstanceClass"].should.equal("db.r6g.xlarge") assert new_cluster["DBClusterInstanceClass"] == "db.r6g.xlarge"
new_cluster["Port"].should.equal(10000) assert new_cluster["Port"] == 10000
@mock_rds @mock_rds
@ -739,12 +748,12 @@ def test_add_tags_to_cluster():
) )
tags = conn.list_tags_for_resource(ResourceName=cluster_arn)["TagList"] tags = conn.list_tags_for_resource(ResourceName=cluster_arn)["TagList"]
tags.should.equal([{"Key": "k1", "Value": "v1"}, {"Key": "k2", "Value": "v2"}]) assert tags == [{"Key": "k1", "Value": "v1"}, {"Key": "k2", "Value": "v2"}]
conn.remove_tags_from_resource(ResourceName=cluster_arn, TagKeys=["k1"]) conn.remove_tags_from_resource(ResourceName=cluster_arn, TagKeys=["k1"])
tags = conn.list_tags_for_resource(ResourceName=cluster_arn)["TagList"] tags = conn.list_tags_for_resource(ResourceName=cluster_arn)["TagList"]
tags.should.equal([{"Key": "k2", "Value": "v2"}]) assert tags == [{"Key": "k2", "Value": "v2"}]
@mock_rds @mock_rds
@ -771,12 +780,12 @@ def test_add_tags_to_cluster_snapshot():
) )
tags = conn.list_tags_for_resource(ResourceName=snapshot_arn)["TagList"] tags = conn.list_tags_for_resource(ResourceName=snapshot_arn)["TagList"]
tags.should.equal([{"Key": "k1", "Value": "v1"}, {"Key": "k2", "Value": "v2"}]) assert tags == [{"Key": "k1", "Value": "v1"}, {"Key": "k2", "Value": "v2"}]
conn.remove_tags_from_resource(ResourceName=snapshot_arn, TagKeys=["k1"]) conn.remove_tags_from_resource(ResourceName=snapshot_arn, TagKeys=["k1"])
tags = conn.list_tags_for_resource(ResourceName=snapshot_arn)["TagList"] tags = conn.list_tags_for_resource(ResourceName=snapshot_arn)["TagList"]
tags.should.equal([{"Key": "k2", "Value": "v2"}]) assert tags == [{"Key": "k2", "Value": "v2"}]
@mock_rds @mock_rds
@ -795,7 +804,7 @@ def test_create_serverless_db_cluster():
) )
cluster = resp["DBCluster"] cluster = resp["DBCluster"]
# This is only true for specific engine versions # This is only true for specific engine versions
cluster.should.have.key("HttpEndpointEnabled").equal(True) assert cluster["HttpEndpointEnabled"] is True
# Verify that a default serverless_configuration is added # Verify that a default serverless_configuration is added
assert "ScalingConfigurationInfo" in cluster assert "ScalingConfigurationInfo" in cluster
@ -819,7 +828,7 @@ def test_create_db_cluster_with_enable_http_endpoint_invalid():
) )
cluster = resp["DBCluster"] cluster = resp["DBCluster"]
# This attribute is ignored if an invalid engine version is supplied # This attribute is ignored if an invalid engine version is supplied
cluster.should.have.key("HttpEndpointEnabled").equal(False) assert cluster["HttpEndpointEnabled"] is False
@mock_rds @mock_rds
@ -859,8 +868,10 @@ def test_describe_db_clusters_filter_by_engine():
@mock_rds @mock_rds
def test_replicate_cluster(): def test_replicate_cluster():
# WHEN create_db_cluster is called # WHEN create_db_cluster is called
# AND create_db_cluster is called again with ReplicationSourceIdentifier set to the first cluster # AND create_db_cluster is called again with ReplicationSourceIdentifier
# THEN promote_read_replica_db_cluster can be called on the second cluster, elevating it to a read/write cluster # set to the first cluster
# THEN promote_read_replica_db_cluster can be called on the second
# cluster, elevating it to a read/write cluster
us_east = boto3.client("rds", "us-east-1") us_east = boto3.client("rds", "us-east-1")
us_west = boto3.client("rds", "us-west-1") us_west = boto3.client("rds", "us-west-1")

View File

@ -1,7 +1,7 @@
import boto3 import boto3
from botocore.exceptions import ClientError
import pytest import pytest
from botocore.exceptions import ClientError
from moto import mock_rds from moto import mock_rds
@ -11,7 +11,7 @@ def test_add_instance_as_cluster_member():
# the instance is included as a ClusterMember in the describe_db_clusters call # the instance is included as a ClusterMember in the describe_db_clusters call
client = boto3.client("rds", "us-east-1") client = boto3.client("rds", "us-east-1")
client.create_db_cluster( _ = client.create_db_cluster(
DBClusterIdentifier="dbci", DBClusterIdentifier="dbci",
Engine="mysql", Engine="mysql",
MasterUsername="masterusername", MasterUsername="masterusername",
@ -43,7 +43,7 @@ def test_remove_instance_from_cluster():
# the instance is included as a ClusterMember in the describe_db_clusters call # the instance is included as a ClusterMember in the describe_db_clusters call
client = boto3.client("rds", "us-east-1") client = boto3.client("rds", "us-east-1")
client.create_db_cluster( _ = client.create_db_cluster(
DBClusterIdentifier="dbci", DBClusterIdentifier="dbci",
Engine="mysql", Engine="mysql",
MasterUsername="masterusername", MasterUsername="masterusername",
@ -72,7 +72,7 @@ def test_remove_instance_from_cluster():
def test_add_instance_to_serverless_cluster(): def test_add_instance_to_serverless_cluster():
client = boto3.client("rds", "us-east-1") client = boto3.client("rds", "us-east-1")
client.create_db_cluster( _ = client.create_db_cluster(
DBClusterIdentifier="dbci", DBClusterIdentifier="dbci",
Engine="aurora", Engine="aurora",
EngineMode="serverless", EngineMode="serverless",

View File

@ -1,8 +1,7 @@
import boto3 import boto3
import pytest
import sure # noqa # pylint: disable=unused-import
from botocore.exceptions import ClientError from botocore.exceptions import ClientError
import pytest
from moto import mock_rds from moto import mock_rds
from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID
@ -44,16 +43,16 @@ def test_create_event_subscription():
SourceIds=[db_identifier], SourceIds=[db_identifier],
).get("EventSubscription") ).get("EventSubscription")
es["CustSubscriptionId"].should.equal(f"{db_identifier}-events") assert es["CustSubscriptionId"] == f"{db_identifier}-events"
es["SnsTopicArn"].should.equal( assert es["SnsTopicArn"] == (
f"arn:aws:sns::{ACCOUNT_ID}:{db_identifier}-events-topic" f"arn:aws:sns::{ACCOUNT_ID}:{db_identifier}-events-topic"
) )
es["SourceType"].should.equal("db-instance") assert es["SourceType"] == "db-instance"
es["EventCategoriesList"].should.equal( assert es["EventCategoriesList"] == (
["Backup", "Creation", "Deletion", "Failure", "Recovery", "Restoration"] ["Backup", "Creation", "Deletion", "Failure", "Recovery", "Restoration"]
) )
es["SourceIdsList"].should.equal([db_identifier]) assert es["SourceIdsList"] == [db_identifier]
es["Enabled"].should.equal(False) assert es["Enabled"] is False
@mock_rds @mock_rds
@ -75,8 +74,8 @@ def test_create_event_fail_already_exists():
err = ex.value.response["Error"] err = ex.value.response["Error"]
err["Code"].should.equal("SubscriptionAlreadyExistFault") assert err["Code"] == "SubscriptionAlreadyExistFault"
err["Message"].should.equal("Subscription db-primary-1-events already exists.") assert err["Message"] == "Subscription db-primary-1-events already exists."
@mock_rds @mock_rds
@ -86,8 +85,8 @@ def test_delete_event_subscription_fails_unknown_subscription():
client.delete_event_subscription(SubscriptionName="my-db-events") client.delete_event_subscription(SubscriptionName="my-db-events")
err = ex.value.response["Error"] err = ex.value.response["Error"]
err["Code"].should.equal("SubscriptionNotFoundFault") assert err["Code"] == "SubscriptionNotFoundFault"
err["Message"].should.equal("Subscription my-db-events not found.") assert err["Message"] == "Subscription my-db-events not found."
@mock_rds @mock_rds
@ -104,8 +103,8 @@ def test_delete_event_subscription():
SubscriptionName=f"{db_identifier}-events" SubscriptionName=f"{db_identifier}-events"
).get("EventSubscription") ).get("EventSubscription")
es["CustSubscriptionId"].should.equal(f"{db_identifier}-events") assert es["CustSubscriptionId"] == f"{db_identifier}-events"
es["SnsTopicArn"].should.equal( assert es["SnsTopicArn"] == (
f"arn:aws:sns::{ACCOUNT_ID}:{db_identifier}-events-topic" f"arn:aws:sns::{ACCOUNT_ID}:{db_identifier}-events-topic"
) )
@ -122,8 +121,8 @@ def test_describe_event_subscriptions():
subscriptions = client.describe_event_subscriptions().get("EventSubscriptionsList") subscriptions = client.describe_event_subscriptions().get("EventSubscriptionsList")
subscriptions.should.have.length_of(1) assert len(subscriptions) == 1
subscriptions[0]["CustSubscriptionId"].should.equal(f"{db_identifier}-events") assert subscriptions[0]["CustSubscriptionId"] == f"{db_identifier}-events"
@mock_rds @mock_rds
@ -134,5 +133,5 @@ def test_describe_event_subscriptions_fails_unknown_subscription():
err = ex.value.response["Error"] err = ex.value.response["Error"]
err["Code"].should.equal("SubscriptionNotFoundFault") assert err["Code"] == "SubscriptionNotFoundFault"
err["Message"].should.equal("Subscription my-db-events not found.") assert err["Message"] == "Subscription my-db-events not found."

View File

@ -1,8 +1,7 @@
import boto3 import boto3
import pytest
import sure # noqa # pylint: disable=unused-import
from botocore.exceptions import ClientError from botocore.exceptions import ClientError
import pytest
from moto import mock_rds from moto import mock_rds
from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID
@ -63,8 +62,8 @@ def test_start_export_task_fails_unknown_snapshot():
) )
err = ex.value.response["Error"] err = ex.value.response["Error"]
err["Code"].should.equal("DBSnapshotNotFound") assert err["Code"] == "DBSnapshotNotFound"
err["Message"].should.equal("DBSnapshot snapshot-1 not found.") assert err["Message"] == "DBSnapshot snapshot-1 not found."
@mock_rds @mock_rds
@ -82,16 +81,16 @@ def test_start_export_task_db():
ExportOnly=["schema.table"], ExportOnly=["schema.table"],
) )
export["ExportTaskIdentifier"].should.equal("export-snapshot-1") assert export["ExportTaskIdentifier"] == "export-snapshot-1"
export["SourceArn"].should.equal(source_arn) assert export["SourceArn"] == source_arn
export["S3Bucket"].should.equal("export-bucket") assert export["S3Bucket"] == "export-bucket"
export["S3Prefix"].should.equal("snaps/") assert export["S3Prefix"] == "snaps/"
export["IamRoleArn"].should.equal("arn:aws:iam:::role/export-role") assert export["IamRoleArn"] == "arn:aws:iam:::role/export-role"
export["KmsKeyId"].should.equal( assert export["KmsKeyId"] == (
"arn:aws:kms:::key/0ea3fef3-80a7-4778-9d8c-1c0c6EXAMPLE" "arn:aws:kms:::key/0ea3fef3-80a7-4778-9d8c-1c0c6EXAMPLE"
) )
export["ExportOnly"].should.equal(["schema.table"]) assert export["ExportOnly"] == ["schema.table"]
export["SourceType"].should.equal("SNAPSHOT") assert export["SourceType"] == "SNAPSHOT"
@mock_rds @mock_rds
@ -109,16 +108,16 @@ def test_start_export_task_db_cluster():
ExportOnly=["schema.table"], ExportOnly=["schema.table"],
) )
export["ExportTaskIdentifier"].should.equal("export-snapshot-1") assert export["ExportTaskIdentifier"] == "export-snapshot-1"
export["SourceArn"].should.equal(source_arn) assert export["SourceArn"] == source_arn
export["S3Bucket"].should.equal("export-bucket") assert export["S3Bucket"] == "export-bucket"
export["S3Prefix"].should.equal("snaps/") assert export["S3Prefix"] == "snaps/"
export["IamRoleArn"].should.equal("arn:aws:iam:::role/export-role") assert export["IamRoleArn"] == "arn:aws:iam:::role/export-role"
export["KmsKeyId"].should.equal( assert export["KmsKeyId"] == (
"arn:aws:kms:::key/0ea3fef3-80a7-4778-9d8c-1c0c6EXAMPLE" "arn:aws:kms:::key/0ea3fef3-80a7-4778-9d8c-1c0c6EXAMPLE"
) )
export["ExportOnly"].should.equal(["schema.table"]) assert export["ExportOnly"] == ["schema.table"]
export["SourceType"].should.equal("CLUSTER") assert export["SourceType"] == "CLUSTER"
@mock_rds @mock_rds
@ -143,9 +142,10 @@ def test_start_export_task_fail_already_exists():
) )
err = ex.value.response["Error"] err = ex.value.response["Error"]
err["Code"].should.equal("ExportTaskAlreadyExistsFault") assert err["Code"] == "ExportTaskAlreadyExistsFault"
err["Message"].should.equal( assert err["Message"] == (
"Cannot start export task because a task with the identifier export-snapshot-1 already exists." "Cannot start export task because a task with the identifier "
"export-snapshot-1 already exists."
) )
@ -156,9 +156,10 @@ def test_cancel_export_task_fails_unknown_task():
client.cancel_export_task(ExportTaskIdentifier="export-snapshot-1") client.cancel_export_task(ExportTaskIdentifier="export-snapshot-1")
err = ex.value.response["Error"] err = ex.value.response["Error"]
err["Code"].should.equal("ExportTaskNotFoundFault") assert err["Code"] == "ExportTaskNotFoundFault"
err["Message"].should.equal( assert err["Message"] == (
"Cannot cancel export task because a task with the identifier export-snapshot-1 is not exist." "Cannot cancel export task because a task with the identifier "
"export-snapshot-1 is not exist."
) )
@ -177,8 +178,8 @@ def test_cancel_export_task():
export = client.cancel_export_task(ExportTaskIdentifier="export-snapshot-1") export = client.cancel_export_task(ExportTaskIdentifier="export-snapshot-1")
export["ExportTaskIdentifier"].should.equal("export-snapshot-1") assert export["ExportTaskIdentifier"] == "export-snapshot-1"
export["Status"].should.equal("canceled") assert export["Status"] == "canceled"
@mock_rds @mock_rds
@ -195,8 +196,8 @@ def test_describe_export_tasks():
exports = client.describe_export_tasks().get("ExportTasks") exports = client.describe_export_tasks().get("ExportTasks")
exports.should.have.length_of(1) assert len(exports) == 1
exports[0]["ExportTaskIdentifier"].should.equal("export-snapshot-1") assert exports[0]["ExportTaskIdentifier"] == "export-snapshot-1"
@mock_rds @mock_rds
@ -206,7 +207,8 @@ def test_describe_export_tasks_fails_unknown_task():
client.describe_export_tasks(ExportTaskIdentifier="export-snapshot-1") client.describe_export_tasks(ExportTaskIdentifier="export-snapshot-1")
err = ex.value.response["Error"] err = ex.value.response["Error"]
err["Code"].should.equal("ExportTaskNotFoundFault") assert err["Code"] == "ExportTaskNotFoundFault"
err["Message"].should.equal( assert err["Message"] == (
"Cannot cancel export task because a task with the identifier export-snapshot-1 is not exist." "Cannot cancel export task because a task with the identifier "
"export-snapshot-1 is not exist."
) )

View File

@ -1,6 +1,5 @@
from urllib.parse import urlencode from urllib.parse import urlencode
import moto.server as server import moto.server as server
import sure # noqa # pylint: disable=unused-import
def test_list_databases(): def test_list_databases():
@ -9,7 +8,7 @@ def test_list_databases():
res = test_client.get("/?Action=DescribeDBInstances") res = test_client.get("/?Action=DescribeDBInstances")
res.data.decode("utf-8").should.contain("<DescribeDBInstancesResult>") assert "<DescribeDBInstancesResult>" in res.data.decode("utf-8")
def test_create_db_instance(): def test_create_db_instance():
@ -28,10 +27,10 @@ def test_create_db_instance():
resp = test_client.post(f"/?{qs}") resp = test_client.post(f"/?{qs}")
response = resp.data.decode("utf-8") response = resp.data.decode("utf-8")
response.shouldnt.contain("<DBClusterIdentifier>") assert "<DBClusterIdentifier>" not in response
response.should.contain("<DBInstanceIdentifier>hi</DBInstanceIdentifier") assert "<DBInstanceIdentifier>hi</DBInstanceIdentifier" in response
# We do not pass these values - they should default to false # We do not pass these values - they should default to false
response.should.contain("<MultiAZ>false</MultiAZ>") assert "<MultiAZ>false</MultiAZ>" in response
response.should.contain( assert (
"<IAMDatabaseAuthenticationEnabled>false</IAMDatabaseAuthenticationEnabled>" "<IAMDatabaseAuthenticationEnabled>false</IAMDatabaseAuthenticationEnabled>"
) ) in response

View File

@ -11,7 +11,7 @@ from moto.rds.utils import (
) )
class TestFilterValidation(object): class TestFilterValidation:
@classmethod @classmethod
def setup_class(cls): def setup_class(cls):
cls.filter_defs = { cls.filter_defs = {
@ -37,13 +37,13 @@ class TestFilterValidation(object):
validate_filters(filters, self.filter_defs) validate_filters(filters, self.filter_defs)
class Resource(object): class Resource:
def __init__(self, identifier, **kwargs): def __init__(self, identifier, **kwargs):
self.identifier = identifier self.identifier = identifier
self.__dict__.update(kwargs) self.__dict__.update(kwargs)
class TestResourceFiltering(object): class TestResourceFiltering:
@classmethod @classmethod
def setup_class(cls): def setup_class(cls):
cls.filter_defs = { cls.filter_defs = {
@ -64,31 +64,31 @@ class TestResourceFiltering(object):
def test_filtering_on_nested_attribute(self): def test_filtering_on_nested_attribute(self):
filters = {"nested-resource": ["nested-id-1"]} filters = {"nested-resource": ["nested-id-1"]}
filtered_resources = apply_filter(self.resources, filters, self.filter_defs) filtered_resources = apply_filter(self.resources, filters, self.filter_defs)
filtered_resources.should.have.length_of(1) assert len(filtered_resources) == 1
filtered_resources.should.have.key("identifier-3") assert "identifier-3" in filtered_resources
def test_filtering_on_common_attribute(self): def test_filtering_on_common_attribute(self):
filters = {"common-attr": ["common"]} filters = {"common-attr": ["common"]}
filtered_resources = apply_filter(self.resources, filters, self.filter_defs) filtered_resources = apply_filter(self.resources, filters, self.filter_defs)
filtered_resources.should.have.length_of(2) assert len(filtered_resources) == 2
filtered_resources.should.have.key("identifier-1") assert "identifier-1" in filtered_resources
filtered_resources.should.have.key("identifier-4") assert "identifier-4" in filtered_resources
def test_filtering_on_multiple_attributes(self): def test_filtering_on_multiple_attributes(self):
filters = {"multiple-attrs": ["common"]} filters = {"multiple-attrs": ["common"]}
filtered_resources = apply_filter(self.resources, filters, self.filter_defs) filtered_resources = apply_filter(self.resources, filters, self.filter_defs)
filtered_resources.should.have.length_of(3) assert len(filtered_resources) == 3
filtered_resources.should.have.key("identifier-1") assert "identifier-1" in filtered_resources
filtered_resources.should.have.key("identifier-4") assert "identifier-4" in filtered_resources
filtered_resources.should.have.key("identifier-5") assert "identifier-5" in filtered_resources
def test_filters_with_multiple_values(self): def test_filters_with_multiple_values(self):
filters = {"identifier": ["identifier-0", "identifier-3", "identifier-5"]} filters = {"identifier": ["identifier-0", "identifier-3", "identifier-5"]}
filtered_resources = apply_filter(self.resources, filters, self.filter_defs) filtered_resources = apply_filter(self.resources, filters, self.filter_defs)
filtered_resources.should.have.length_of(3) assert len(filtered_resources) == 3
filtered_resources.should.have.key("identifier-0") assert "identifier-0" in filtered_resources
filtered_resources.should.have.key("identifier-3") assert "identifier-3" in filtered_resources
filtered_resources.should.have.key("identifier-5") assert "identifier-5" in filtered_resources
def test_multiple_filters(self): def test_multiple_filters(self):
filters = { filters = {
@ -97,11 +97,11 @@ class TestResourceFiltering(object):
"multiple-attrs": ["common"], "multiple-attrs": ["common"],
} }
filtered_resources = apply_filter(self.resources, filters, self.filter_defs) filtered_resources = apply_filter(self.resources, filters, self.filter_defs)
filtered_resources.should.have.length_of(1) assert len(filtered_resources) == 1
filtered_resources.should.have.key("identifier-1") assert "identifier-1" in filtered_resources
class TestMergingFilters(object): class TestMergingFilters:
def test_when_filters_to_update_is_none(self): def test_when_filters_to_update_is_none(self):
filters_to_update = {"filter-name": ["value1"]} filters_to_update = {"filter-name": ["value1"]}
merged = merge_filters(filters_to_update, None) merged = merge_filters(filters_to_update, None)
@ -135,12 +135,15 @@ class TestMergingFilters(object):
} }
merged = merge_filters(filters_to_update, filters_to_merge) merged = merge_filters(filters_to_update, filters_to_merge)
assert len(merged.keys()) == 4 assert len(merged.keys()) == 4
for key in merged.keys(): for value in merged.values():
assert merged[key] == ["value1", "value2"] assert value == ["value1", "value2"]
def test_encode_orderable_db_instance(): def test_encode_orderable_db_instance():
# Data from AWS comes in a specific format. Verify we can encode/decode it to something more compact """Verify the data can be encoded/decoded to something more compact.
Data from AWS comes in a specific format.
"""
original = { original = {
"Engine": "neptune", "Engine": "neptune",
"EngineVersion": "1.0.3.0", "EngineVersion": "1.0.3.0",
@ -173,11 +176,15 @@ def test_encode_orderable_db_instance():
"Support edNetworkTypes": ["IPV4"], "Support edNetworkTypes": ["IPV4"],
} }
short = encode_orderable_db_instance(original) short = encode_orderable_db_instance(original)
decode_orderable_db_instance(short).should.equal(original) assert decode_orderable_db_instance(short) == original
def test_encode_orderable_db_instance__short_format(): def test_encode_orderable_db_instance__short_format():
# Verify this works in a random format. We don't know for sure what AWS returns, so it should always work regardless of the input """Verify this works in a random format.
We don't know for sure what AWS returns, so it should always work
regardless of the input.
"""
short = { short = {
"Engine": "neptune", "Engine": "neptune",
"EngineVersion": "1.0.3.0", "EngineVersion": "1.0.3.0",
@ -190,12 +197,10 @@ def test_encode_orderable_db_instance__short_format():
"SupportsClusters": True, "SupportsClusters": True,
"SupportedNetworkTypes": ["IPV4"], "SupportedNetworkTypes": ["IPV4"],
} }
decode_orderable_db_instance(encode_orderable_db_instance(short)).should.equal( assert decode_orderable_db_instance(encode_orderable_db_instance(short)) == short
short
)
def test_verify_encoding_is_unique(): def test_verify_encoding_is_unique():
len(set(ORDERABLE_DB_INSTANCE_ENCODING.values())).should.equal( assert len(set(ORDERABLE_DB_INSTANCE_ENCODING.values())) == len(
len(ORDERABLE_DB_INSTANCE_ENCODING.keys()) ORDERABLE_DB_INSTANCE_ENCODING.keys()
) )