diff --git a/moto/rds2/models.py b/moto/rds2/models.py
index 52840a1a0..809c9feeb 100644
--- a/moto/rds2/models.py
+++ b/moto/rds2/models.py
@@ -37,6 +37,7 @@ class Cluster:
def __init__(self, **kwargs):
self.db_name = kwargs.get("db_name")
self.db_cluster_identifier = kwargs.get("db_cluster_identifier")
+ self.deletion_protection = kwargs.get("deletion_protection")
self.engine = kwargs.get("engine")
self.engine_version = kwargs.get("engine_version")
if not self.engine_version:
@@ -133,7 +134,7 @@ class Cluster:
false
{{ cluster.engine_mode }}
- false
+ {{ 'true' if cluster.deletion_protection else 'false' }}
false
false
false
@@ -264,6 +265,7 @@ class Database(CloudFormationModel):
)
self.dbi_resource_id = "db-M5ENSHXFPU6XHZ4G4ZEI5QIO2U"
self.tags = kwargs.get("tags", [])
+ self.deletion_protection = kwargs.get("deletion_protection", False)
@property
def db_instance_arn(self):
@@ -425,6 +427,7 @@ class Database(CloudFormationModel):
{%- endfor -%}
+ {{ 'true' if database.deletion_protection else 'false' }}
"""
)
return template.render(database=self)
@@ -1120,6 +1123,10 @@ class RDS2Backend(BaseBackend):
def delete_database(self, db_instance_identifier, db_snapshot_name=None):
if db_instance_identifier in self.databases:
+ if self.databases[db_instance_identifier].deletion_protection:
+ raise InvalidParameterValue(
+ "Can't delete Instance with protection enabled"
+ )
if db_snapshot_name:
self.create_snapshot(db_instance_identifier, db_snapshot_name)
database = self.databases.pop(db_instance_identifier)
@@ -1433,7 +1440,13 @@ class RDS2Backend(BaseBackend):
return self.clusters.values()
def delete_db_cluster(self, cluster_identifier):
- return self.clusters.pop(cluster_identifier)
+ if cluster_identifier in self.clusters:
+ if self.clusters[cluster_identifier].deletion_protection:
+ raise InvalidParameterValue(
+ "Can't delete Cluster with protection enabled"
+ )
+ return self.clusters.pop(cluster_identifier)
+ raise DBClusterNotFoundError(cluster_identifier)
def start_db_cluster(self, cluster_identifier):
if cluster_identifier not in self.clusters:
diff --git a/moto/rds2/responses.py b/moto/rds2/responses.py
index c27f88cda..7d075a342 100644
--- a/moto/rds2/responses.py
+++ b/moto/rds2/responses.py
@@ -51,6 +51,7 @@ class RDS2Response(BaseResponse):
"VpcSecurityGroupIds.VpcSecurityGroupId"
),
"tags": list(),
+ "deletion_protection": self._get_bool_param("DeletionProtection"),
}
args["tags"] = self.unpack_complex_list_params("Tags.Tag", ("Key", "Value"))
return args
@@ -93,6 +94,7 @@ class RDS2Response(BaseResponse):
),
"db_name": self._get_param("DatabaseName"),
"db_cluster_identifier": self._get_param("DBClusterIdentifier"),
+ "deletion_protection": self._get_bool_param("DeletionProtection"),
"engine": self._get_param("Engine"),
"engine_version": self._get_param("EngineVersion"),
"engine_mode": self._get_param("EngineMode"),
diff --git a/tests/test_rds2/test_rds2.py b/tests/test_rds2/test_rds2.py
index e7dba8c38..bce663f97 100644
--- a/tests/test_rds2/test_rds2.py
+++ b/tests/test_rds2/test_rds2.py
@@ -39,6 +39,23 @@ def test_create_database():
db_instance["CopyTagsToSnapshot"].should.equal(False)
db_instance["InstanceCreateTime"].should.be.a("datetime.datetime")
db_instance["VpcSecurityGroups"][0]["VpcSecurityGroupId"].should.equal("sg-123456")
+ db_instance["DeletionProtection"].should.equal(False)
+
+
+@mock_rds2
+def test_database_with_deletion_protection_cannot_be_deleted():
+ conn = boto3.client("rds", region_name="us-west-2")
+ database = conn.create_db_instance(
+ DBInstanceIdentifier="db-master-1",
+ AllocatedStorage=10,
+ Engine="postgres",
+ DBName="staging-postgres",
+ DBInstanceClass="db.m1.small",
+ DeletionProtection=True,
+ )
+ db_instance = database["DBInstance"]
+ db_instance["DBInstanceClass"].should.equal("db.m1.small")
+ db_instance["DeletionProtection"].should.equal(True)
@mock_rds2
@@ -302,6 +319,7 @@ def test_get_databases():
MasterUserPassword="hunter2",
Port=1234,
DBSecurityGroups=["my_sg"],
+ DeletionProtection=True,
)
instances = conn.describe_db_instances()
list(instances["DBInstances"]).should.have.length_of(2)
@@ -309,10 +327,14 @@ def test_get_databases():
instances = conn.describe_db_instances(DBInstanceIdentifier="db-master-1")
list(instances["DBInstances"]).should.have.length_of(1)
instances["DBInstances"][0]["DBInstanceIdentifier"].should.equal("db-master-1")
+ instances["DBInstances"][0]["DeletionProtection"].should.equal(False)
instances["DBInstances"][0]["DBInstanceArn"].should.equal(
"arn:aws:rds:us-west-2:{}:db:db-master-1".format(ACCOUNT_ID)
)
+ instances = conn.describe_db_instances(DBInstanceIdentifier="db-master-2")
+ instances["DBInstances"][0]["DeletionProtection"].should.equal(True)
+
@mock_rds2
def test_get_databases_paginated():
@@ -863,6 +885,23 @@ def test_modify_non_existent_option_group():
).should.throw(ClientError, "Specified OptionGroupName: non-existent not found.")
+@mock_rds2
+def test_delete_database_with_protection():
+ conn = boto3.client("rds", region_name="us-west-2")
+ conn.create_db_instance(
+ DBInstanceIdentifier="db-primary-1",
+ AllocatedStorage=10,
+ Engine="postgres",
+ DBInstanceClass="db.m1.small",
+ DeletionProtection=True,
+ )
+
+ with pytest.raises(ClientError) as exc:
+ conn.delete_db_instance(DBInstanceIdentifier="db-primary-1")
+ err = exc.value.response["Error"]
+ err["Message"].should.equal("Can't delete Instance with protection enabled")
+
+
@mock_rds2
def test_delete_non_existent_database():
conn = boto3.client("rds", region_name="us-west-2")
diff --git a/tests/test_rds2/test_rds2_clusters.py b/tests/test_rds2/test_rds2_clusters.py
index 923369f93..0321fc079 100644
--- a/tests/test_rds2/test_rds2_clusters.py
+++ b/tests/test_rds2/test_rds2_clusters.py
@@ -120,6 +120,7 @@ def test_create_db_cluster__verify_default_properties():
cluster.should.have.key("HttpEndpointEnabled").equal(False)
cluster.should.have.key("CopyTagsToSnapshot").equal(False)
cluster.should.have.key("CrossAccountClone").equal(False)
+ cluster.should.have.key("DeletionProtection").equal(False)
cluster.should.have.key("DomainMemberships").equal([])
cluster.should.have.key("TagList").equal([])
cluster.should.have.key("ClusterCreateTime")
@@ -155,6 +156,7 @@ def test_create_db_cluster_additional_parameters():
MasterUsername="root",
MasterUserPassword="hunter2_",
Port=1234,
+ DeletionProtection=True,
)
cluster = resp["DBCluster"]
@@ -164,6 +166,7 @@ def test_create_db_cluster_additional_parameters():
cluster.should.have.key("EngineVersion").equal("5.6.mysql_aurora.1.19.2")
cluster.should.have.key("EngineMode").equal("serverless")
cluster.should.have.key("Port").equal(1234)
+ cluster.should.have.key("DeletionProtection").equal(True)
@mock_rds2
@@ -207,6 +210,35 @@ def test_delete_db_cluster():
client.describe_db_clusters()["DBClusters"].should.have.length_of(0)
+@mock_rds2
+def test_delete_db_cluster_that_is_protected():
+ client = boto3.client("rds", region_name="eu-north-1")
+
+ client.create_db_cluster(
+ DBClusterIdentifier="cluster-id",
+ Engine="aurora",
+ MasterUsername="root",
+ MasterUserPassword="hunter2_",
+ DeletionProtection=True,
+ )
+
+ with pytest.raises(ClientError) as exc:
+ client.delete_db_cluster(DBClusterIdentifier="cluster-id")
+ err = exc.value.response["Error"]
+ err["Message"].should.equal("Can't delete Cluster with protection enabled")
+
+
+@mock_rds2
+def test_delete_db_cluster_unknown_cluster():
+ client = boto3.client("rds", region_name="eu-north-1")
+
+ with pytest.raises(ClientError) as ex:
+ client.delete_db_cluster(DBClusterIdentifier="cluster-unknown")
+ err = ex.value.response["Error"]
+ err["Code"].should.equal("DBClusterNotFoundFault")
+ err["Message"].should.equal("DBCluster cluster-unknown not found.")
+
+
@mock_rds2
def test_start_db_cluster_unknown_cluster():
client = boto3.client("rds", region_name="eu-north-1")