moto/tests/test_rds2/test_rds2_clusters.py

623 lines
21 KiB
Python
Raw Normal View History

import boto3
import pytest
2021-10-18 19:44:29 +00:00
import sure # noqa # pylint: disable=unused-import
from botocore.exceptions import ClientError
from moto import mock_rds2
from moto.core import ACCOUNT_ID
@mock_rds2
def test_describe_db_cluster_initial():
client = boto3.client("rds", region_name="eu-north-1")
resp = client.describe_db_clusters()
resp.should.have.key("DBClusters").should.have.length_of(0)
@mock_rds2
def test_create_db_cluster_needs_master_username():
client = boto3.client("rds", region_name="eu-north-1")
with pytest.raises(ClientError) as ex:
client.create_db_cluster(DBClusterIdentifier="cluster-id", Engine="aurora")
err = ex.value.response["Error"]
err["Code"].should.equal("InvalidParameterValue")
err["Message"].should.equal(
"The parameter MasterUsername must be provided and must not be blank."
)
@mock_rds2
def test_create_db_cluster_needs_master_user_password():
client = boto3.client("rds", region_name="eu-north-1")
with pytest.raises(ClientError) as ex:
client.create_db_cluster(
DBClusterIdentifier="cluster-id", Engine="aurora", MasterUsername="root"
)
err = ex.value.response["Error"]
err["Code"].should.equal("InvalidParameterValue")
err["Message"].should.equal(
"The parameter MasterUserPassword must be provided and must not be blank."
)
@mock_rds2
def test_create_db_cluster_needs_long_master_user_password():
client = boto3.client("rds", region_name="eu-north-1")
with pytest.raises(ClientError) as ex:
client.create_db_cluster(
DBClusterIdentifier="cluster-id",
Engine="aurora",
MasterUsername="root",
MasterUserPassword="hunter2",
)
err = ex.value.response["Error"]
err["Code"].should.equal("InvalidParameterValue")
err["Message"].should.equal(
"The parameter MasterUserPassword is not a valid password because it is shorter than 8 characters."
)
@mock_rds2
def test_create_db_cluster__verify_default_properties():
client = boto3.client("rds", region_name="eu-north-1")
resp = client.create_db_cluster(
DBClusterIdentifier="cluster-id",
Engine="aurora",
MasterUsername="root",
MasterUserPassword="hunter2_",
)
resp.should.have.key("DBCluster")
cluster = resp["DBCluster"]
cluster.shouldnt.have.key(
"DatabaseName"
) # This was not supplied, so should not be returned
cluster.should.have.key("AllocatedStorage").equal(1)
cluster.should.have.key("AvailabilityZones")
set(cluster["AvailabilityZones"]).should.equal(
{"eu-north-1a", "eu-north-1b", "eu-north-1c"}
)
cluster.should.have.key("BackupRetentionPeriod").equal(1)
cluster.should.have.key("DBClusterIdentifier").equal("cluster-id")
cluster.should.have.key("DBClusterParameterGroup").equal("default.aurora8.0")
cluster.should.have.key("DBSubnetGroup").equal("default")
cluster.should.have.key("Status").equal("creating")
cluster.should.have.key("Endpoint").match(
"cluster-id.cluster-[a-z0-9]{12}.eu-north-1.rds.amazonaws.com"
)
endpoint = cluster["Endpoint"]
expected_readonly = endpoint.replace(
"cluster-id.cluster-", "cluster-id.cluster-ro-"
)
cluster.should.have.key("ReaderEndpoint").equal(expected_readonly)
cluster.should.have.key("MultiAZ").equal(False)
cluster.should.have.key("Engine").equal("aurora")
cluster.should.have.key("EngineVersion").equal("8.0.mysql_aurora.3.01.0")
cluster.should.have.key("Port").equal(3306)
cluster.should.have.key("MasterUsername").equal("root")
cluster.should.have.key("PreferredBackupWindow").equal("01:37-02:07")
cluster.should.have.key("PreferredMaintenanceWindow").equal("wed:02:40-wed:03:10")
cluster.should.have.key("ReadReplicaIdentifiers").equal([])
cluster.should.have.key("DBClusterMembers").equal([])
cluster.should.have.key("VpcSecurityGroups")
cluster.should.have.key("HostedZoneId")
cluster.should.have.key("StorageEncrypted").equal(False)
cluster.should.have.key("DbClusterResourceId").match(r"cluster-[A-Z0-9]{26}")
cluster.should.have.key("DBClusterArn").equal(
f"arn:aws:rds:eu-north-1:{ACCOUNT_ID}:cluster:cluster-id"
)
cluster.should.have.key("AssociatedRoles").equal([])
cluster.should.have.key("IAMDatabaseAuthenticationEnabled").equal(False)
cluster.should.have.key("EngineMode").equal("provisioned")
cluster.should.have.key("DeletionProtection").equal(False)
cluster.should.have.key("HttpEndpointEnabled").equal(False)
cluster.should.have.key("CopyTagsToSnapshot").equal(False)
cluster.should.have.key("CrossAccountClone").equal(False)
cluster.should.have.key("DeletionProtection").equal(False)
cluster.should.have.key("DomainMemberships").equal([])
cluster.should.have.key("TagList").equal([])
cluster.should.have.key("ClusterCreateTime")
@mock_rds2
def test_create_db_cluster_with_database_name():
client = boto3.client("rds", region_name="eu-north-1")
resp = client.create_db_cluster(
DBClusterIdentifier="cluster-id",
DatabaseName="users",
Engine="aurora",
MasterUsername="root",
MasterUserPassword="hunter2_",
)
cluster = resp["DBCluster"]
cluster.should.have.key("DatabaseName").equal("users")
cluster.should.have.key("DBClusterIdentifier").equal("cluster-id")
cluster.should.have.key("DBClusterParameterGroup").equal("default.aurora8.0")
@mock_rds2
def test_create_db_cluster_additional_parameters():
client = boto3.client("rds", region_name="eu-north-1")
resp = client.create_db_cluster(
AvailabilityZones=["eu-north-1b"],
DBClusterIdentifier="cluster-id",
Engine="aurora",
EngineVersion="8.0.mysql_aurora.3.01.0",
EngineMode="serverless",
MasterUsername="root",
MasterUserPassword="hunter2_",
Port=1234,
DeletionProtection=True,
)
cluster = resp["DBCluster"]
cluster.should.have.key("AvailabilityZones").equal(["eu-north-1b"])
cluster.should.have.key("Engine").equal("aurora")
cluster.should.have.key("EngineVersion").equal("8.0.mysql_aurora.3.01.0")
cluster.should.have.key("EngineMode").equal("serverless")
cluster.should.have.key("Port").equal(1234)
cluster.should.have.key("DeletionProtection").equal(True)
@mock_rds2
def test_describe_db_cluster_after_creation():
client = boto3.client("rds", region_name="eu-north-1")
client.create_db_cluster(
DBClusterIdentifier="cluster-id1",
Engine="aurora",
MasterUsername="root",
MasterUserPassword="hunter2_",
)
client.create_db_cluster(
DBClusterIdentifier="cluster-id2",
Engine="aurora",
MasterUsername="root",
MasterUserPassword="hunter2_",
)
client.describe_db_clusters()["DBClusters"].should.have.length_of(2)
client.describe_db_clusters(DBClusterIdentifier="cluster-id2")[
"DBClusters"
].should.have.length_of(1)
@mock_rds2
def test_delete_db_cluster():
client = boto3.client("rds", region_name="eu-north-1")
client.create_db_cluster(
DBClusterIdentifier="cluster-id",
Engine="aurora",
MasterUsername="root",
MasterUserPassword="hunter2_",
)
client.delete_db_cluster(DBClusterIdentifier="cluster-id")
client.describe_db_clusters()["DBClusters"].should.have.length_of(0)
@mock_rds2
def test_delete_db_cluster_that_is_protected():
client = boto3.client("rds", region_name="eu-north-1")
client.create_db_cluster(
DBClusterIdentifier="cluster-id",
Engine="aurora",
MasterUsername="root",
MasterUserPassword="hunter2_",
DeletionProtection=True,
)
with pytest.raises(ClientError) as exc:
client.delete_db_cluster(DBClusterIdentifier="cluster-id")
err = exc.value.response["Error"]
err["Message"].should.equal("Can't delete Cluster with protection enabled")
@mock_rds2
def test_delete_db_cluster_unknown_cluster():
client = boto3.client("rds", region_name="eu-north-1")
with pytest.raises(ClientError) as ex:
client.delete_db_cluster(DBClusterIdentifier="cluster-unknown")
err = ex.value.response["Error"]
err["Code"].should.equal("DBClusterNotFoundFault")
err["Message"].should.equal("DBCluster cluster-unknown not found.")
@mock_rds2
def test_start_db_cluster_unknown_cluster():
client = boto3.client("rds", region_name="eu-north-1")
with pytest.raises(ClientError) as ex:
client.start_db_cluster(DBClusterIdentifier="cluster-unknown")
err = ex.value.response["Error"]
err["Code"].should.equal("DBClusterNotFoundFault")
err["Message"].should.equal("DBCluster cluster-unknown not found.")
@mock_rds2
def test_start_db_cluster_after_stopping():
client = boto3.client("rds", region_name="eu-north-1")
client.create_db_cluster(
DBClusterIdentifier="cluster-id",
Engine="aurora",
MasterUsername="root",
MasterUserPassword="hunter2_",
)
client.stop_db_cluster(DBClusterIdentifier="cluster-id")
client.start_db_cluster(DBClusterIdentifier="cluster-id")
cluster = client.describe_db_clusters()["DBClusters"][0]
cluster["Status"].should.equal("available")
@mock_rds2
def test_start_db_cluster_without_stopping():
client = boto3.client("rds", region_name="eu-north-1")
client.create_db_cluster(
DBClusterIdentifier="cluster-id",
Engine="aurora",
MasterUsername="root",
MasterUserPassword="hunter2_",
)
with pytest.raises(ClientError) as ex:
client.start_db_cluster(DBClusterIdentifier="cluster-id")
err = ex.value.response["Error"]
err["Code"].should.equal("InvalidDBClusterStateFault")
err["Message"].should.equal("DbCluster cluster-id is not in stopped state.")
@mock_rds2
def test_stop_db_cluster():
client = boto3.client("rds", region_name="eu-north-1")
client.create_db_cluster(
DBClusterIdentifier="cluster-id",
Engine="aurora",
MasterUsername="root",
MasterUserPassword="hunter2_",
)
resp = client.stop_db_cluster(DBClusterIdentifier="cluster-id")
# Quirk of the AWS implementation - the immediate response show it's still available
cluster = resp["DBCluster"]
cluster["Status"].should.equal("available")
# For some time the status will be 'stopping'
# And finally it will be 'stopped'
cluster = client.describe_db_clusters()["DBClusters"][0]
cluster["Status"].should.equal("stopped")
@mock_rds2
def test_stop_db_cluster_already_stopped():
client = boto3.client("rds", region_name="eu-north-1")
client.create_db_cluster(
DBClusterIdentifier="cluster-id",
Engine="aurora",
MasterUsername="root",
MasterUserPassword="hunter2_",
)
client.stop_db_cluster(DBClusterIdentifier="cluster-id")
# can't call stop on a stopped cluster
with pytest.raises(ClientError) as ex:
client.stop_db_cluster(DBClusterIdentifier="cluster-id")
err = ex.value.response["Error"]
err["Code"].should.equal("InvalidDBClusterStateFault")
err["Message"].should.equal("DbCluster cluster-id is not in available state.")
@mock_rds2
def test_stop_db_cluster_unknown_cluster():
client = boto3.client("rds", region_name="eu-north-1")
with pytest.raises(ClientError) as ex:
client.stop_db_cluster(DBClusterIdentifier="cluster-unknown")
err = ex.value.response["Error"]
err["Code"].should.equal("DBClusterNotFoundFault")
err["Message"].should.equal("DBCluster cluster-unknown not found.")
@mock_rds2
def test_create_db_cluster_snapshot_fails_for_unknown_cluster():
conn = boto3.client("rds", region_name="us-west-2")
with pytest.raises(ClientError) as exc:
conn.create_db_cluster_snapshot(
DBClusterIdentifier="db-primary-1", DBClusterSnapshotIdentifier="snapshot-1"
)
err = exc.value.response["Error"]
err["Message"].should.equal("DBCluster db-primary-1 not found.")
@mock_rds2
def test_create_db_cluster_snapshot():
conn = boto3.client("rds", region_name="us-west-2")
conn.create_db_cluster(
DBClusterIdentifier="db-primary-1",
AllocatedStorage=10,
Engine="postgres",
DatabaseName="staging-postgres",
DBClusterInstanceClass="db.m1.small",
MasterUsername="root",
MasterUserPassword="hunter2000",
Port=1234,
)
snapshot = conn.create_db_cluster_snapshot(
DBClusterIdentifier="db-primary-1", DBClusterSnapshotIdentifier="g-1",
).get("DBClusterSnapshot")
snapshot.get("Engine").should.equal("postgres")
snapshot.get("DBClusterIdentifier").should.equal("db-primary-1")
snapshot.get("DBClusterSnapshotIdentifier").should.equal("g-1")
result = conn.list_tags_for_resource(ResourceName=snapshot["DBClusterSnapshotArn"])
result["TagList"].should.equal([])
@mock_rds2
def test_create_db_cluster_snapshot_copy_tags():
conn = boto3.client("rds", region_name="us-west-2")
conn.create_db_cluster(
DBClusterIdentifier="db-primary-1",
AllocatedStorage=10,
Engine="postgres",
DatabaseName="staging-postgres",
DBClusterInstanceClass="db.m1.small",
MasterUsername="root",
MasterUserPassword="hunter2000",
Port=1234,
Tags=[{"Key": "foo", "Value": "bar"}, {"Key": "foo1", "Value": "bar1"}],
CopyTagsToSnapshot=True,
)
snapshot = conn.create_db_cluster_snapshot(
DBClusterIdentifier="db-primary-1", DBClusterSnapshotIdentifier="g-1"
).get("DBClusterSnapshot")
snapshot.get("Engine").should.equal("postgres")
snapshot.get("DBClusterIdentifier").should.equal("db-primary-1")
snapshot.get("DBClusterSnapshotIdentifier").should.equal("g-1")
# breakpoint()
result = conn.list_tags_for_resource(ResourceName=snapshot["DBClusterSnapshotArn"])
result["TagList"].should.equal(
[{"Value": "bar", "Key": "foo"}, {"Value": "bar1", "Key": "foo1"}]
)
@mock_rds2
def test_copy_db_cluster_snapshot_fails_for_unknown_snapshot():
conn = boto3.client("rds", region_name="us-west-2")
with pytest.raises(ClientError) as exc:
conn.copy_db_cluster_snapshot(
SourceDBClusterSnapshotIdentifier="snapshot-1",
TargetDBClusterSnapshotIdentifier="snapshot-2",
)
err = exc.value.response["Error"]
err["Message"].should.equal("DBClusterSnapshot snapshot-1 not found.")
@mock_rds2
def test_copy_db_cluster_snapshot():
conn = boto3.client("rds", region_name="us-west-2")
conn.create_db_cluster(
DBClusterIdentifier="db-primary-1",
AllocatedStorage=10,
Engine="postgres",
DatabaseName="staging-postgres",
DBClusterInstanceClass="db.m1.small",
MasterUsername="root",
MasterUserPassword="hunter2000",
Port=1234,
)
conn.create_db_cluster_snapshot(
DBClusterIdentifier="db-primary-1", DBClusterSnapshotIdentifier="snapshot-1",
).get("DBClusterSnapshot")
target_snapshot = conn.copy_db_cluster_snapshot(
SourceDBClusterSnapshotIdentifier="snapshot-1",
TargetDBClusterSnapshotIdentifier="snapshot-2",
).get("DBClusterSnapshot")
target_snapshot.get("Engine").should.equal("postgres")
target_snapshot.get("DBClusterIdentifier").should.equal("db-primary-1")
target_snapshot.get("DBClusterSnapshotIdentifier").should.equal("snapshot-2")
result = conn.list_tags_for_resource(
ResourceName=target_snapshot["DBClusterSnapshotArn"]
)
result["TagList"].should.equal([])
@mock_rds2
def test_copy_db_cluster_snapshot_fails_for_existed_target_snapshot():
conn = boto3.client("rds", region_name="us-west-2")
conn.create_db_cluster(
DBClusterIdentifier="db-primary-1",
AllocatedStorage=10,
Engine="postgres",
DatabaseName="staging-postgres",
DBClusterInstanceClass="db.m1.small",
MasterUsername="root",
MasterUserPassword="hunter2000",
Port=1234,
)
conn.create_db_cluster_snapshot(
DBClusterIdentifier="db-primary-1", DBClusterSnapshotIdentifier="snapshot-1",
).get("DBClusterSnapshot")
conn.create_db_cluster_snapshot(
DBClusterIdentifier="db-primary-1", DBClusterSnapshotIdentifier="snapshot-2",
).get("DBClusterSnapshot")
with pytest.raises(ClientError) as exc:
conn.copy_db_cluster_snapshot(
SourceDBClusterSnapshotIdentifier="snapshot-1",
TargetDBClusterSnapshotIdentifier="snapshot-2",
)
err = exc.value.response["Error"]
err["Message"].should.equal(
"Cannot create the snapshot because a snapshot with the identifier snapshot-2 already exists."
)
@mock_rds2
def test_describe_db_cluster_snapshots():
conn = boto3.client("rds", region_name="us-west-2")
conn.create_db_cluster(
DBClusterIdentifier="db-primary-1",
AllocatedStorage=10,
Engine="postgres",
DatabaseName="staging-postgres",
DBClusterInstanceClass="db.m1.small",
MasterUsername="root",
MasterUserPassword="hunter2000",
Port=1234,
)
created = conn.create_db_cluster_snapshot(
DBClusterIdentifier="db-primary-1", DBClusterSnapshotIdentifier="snapshot-1"
).get("DBClusterSnapshot")
created.get("Engine").should.equal("postgres")
by_database_id = conn.describe_db_cluster_snapshots(
DBClusterIdentifier="db-primary-1"
).get("DBClusterSnapshots")
by_snapshot_id = conn.describe_db_cluster_snapshots(
DBClusterSnapshotIdentifier="snapshot-1"
).get("DBClusterSnapshots")
by_snapshot_id.should.equal(by_database_id)
snapshot = by_snapshot_id[0]
snapshot.should.equal(created)
snapshot.get("Engine").should.equal("postgres")
conn.create_db_cluster_snapshot(
DBClusterIdentifier="db-primary-1", DBClusterSnapshotIdentifier="snapshot-2"
)
snapshots = conn.describe_db_cluster_snapshots(
DBClusterIdentifier="db-primary-1"
).get("DBClusterSnapshots")
snapshots.should.have.length_of(2)
@mock_rds2
def test_delete_db_cluster_snapshot():
conn = boto3.client("rds", region_name="us-west-2")
conn.create_db_cluster(
DBClusterIdentifier="db-primary-1",
AllocatedStorage=10,
Engine="postgres",
DatabaseName="staging-postgres",
DBClusterInstanceClass="db.m1.small",
MasterUsername="root",
MasterUserPassword="hunter2000",
Port=1234,
)
conn.create_db_cluster_snapshot(
DBClusterIdentifier="db-primary-1", DBClusterSnapshotIdentifier="snapshot-1"
)
conn.describe_db_cluster_snapshots(DBClusterSnapshotIdentifier="snapshot-1")
conn.delete_db_cluster_snapshot(DBClusterSnapshotIdentifier="snapshot-1")
conn.describe_db_cluster_snapshots.when.called_with(
DBClusterSnapshotIdentifier="snapshot-1"
).should.throw(ClientError)
@mock_rds2
def test_restore_db_cluster_from_snapshot():
conn = boto3.client("rds", region_name="us-west-2")
conn.create_db_cluster(
DBClusterIdentifier="db-primary-1",
AllocatedStorage=10,
Engine="postgres",
DatabaseName="staging-postgres",
DBClusterInstanceClass="db.m1.small",
MasterUsername="root",
MasterUserPassword="hunter2000",
Port=1234,
)
conn.describe_db_clusters()["DBClusters"].should.have.length_of(1)
conn.create_db_cluster_snapshot(
DBClusterIdentifier="db-primary-1", DBClusterSnapshotIdentifier="snapshot-1"
)
# restore
new_cluster = conn.restore_db_cluster_from_snapshot(
DBClusterIdentifier="db-restore-1",
SnapshotIdentifier="snapshot-1",
Engine="postgres",
)["DBCluster"]
new_cluster["DBClusterIdentifier"].should.equal("db-restore-1")
new_cluster["DBClusterInstanceClass"].should.equal("db.m1.small")
new_cluster["StorageType"].should.equal("gp2")
new_cluster["Engine"].should.equal("postgres")
new_cluster["DatabaseName"].should.equal("staging-postgres")
new_cluster["Port"].should.equal(1234)
# Verify it exists
conn.describe_db_clusters()["DBClusters"].should.have.length_of(2)
conn.describe_db_clusters(DBClusterIdentifier="db-restore-1")[
"DBClusters"
].should.have.length_of(1)
@mock_rds2
def test_restore_db_cluster_from_snapshot_and_override_params():
conn = boto3.client("rds", region_name="us-west-2")
conn.create_db_cluster(
DBClusterIdentifier="db-primary-1",
AllocatedStorage=10,
Engine="postgres",
DatabaseName="staging-postgres",
DBClusterInstanceClass="db.m1.small",
MasterUsername="root",
MasterUserPassword="hunter2000",
Port=1234,
)
conn.describe_db_clusters()["DBClusters"].should.have.length_of(1)
conn.create_db_cluster_snapshot(
DBClusterIdentifier="db-primary-1", DBClusterSnapshotIdentifier="snapshot-1"
)
# restore with some updated attributes
new_cluster = conn.restore_db_cluster_from_snapshot(
DBClusterIdentifier="db-restore-1",
SnapshotIdentifier="snapshot-1",
Engine="postgres",
Port=10000,
DBClusterInstanceClass="db.r6g.xlarge",
)["DBCluster"]
new_cluster["DBClusterIdentifier"].should.equal("db-restore-1")
new_cluster["DBClusterParameterGroup"].should.equal("default.aurora8.0")
new_cluster["DBClusterInstanceClass"].should.equal("db.r6g.xlarge")
new_cluster["Port"].should.equal(10000)