RDS - Implement DeletionProtection for Instances/Clusters (#4614)

This commit is contained in:
Bert Blommers 2021-11-22 16:07:05 -01:00 committed by GitHub
parent 34a3a03475
commit ea67a15dcd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 88 additions and 2 deletions

View File

@ -37,6 +37,7 @@ class Cluster:
def __init__(self, **kwargs):
self.db_name = kwargs.get("db_name")
self.db_cluster_identifier = kwargs.get("db_cluster_identifier")
self.deletion_protection = kwargs.get("deletion_protection")
self.engine = kwargs.get("engine")
self.engine_version = kwargs.get("engine_version")
if not self.engine_version:
@ -133,7 +134,7 @@ class Cluster:
<AssociatedRoles></AssociatedRoles>
<IAMDatabaseAuthenticationEnabled>false</IAMDatabaseAuthenticationEnabled>
<EngineMode>{{ cluster.engine_mode }}</EngineMode>
<DeletionProtection>false</DeletionProtection>
<DeletionProtection>{{ 'true' if cluster.deletion_protection else 'false' }}</DeletionProtection>
<HttpEndpointEnabled>false</HttpEndpointEnabled>
<CopyTagsToSnapshot>false</CopyTagsToSnapshot>
<CrossAccountClone>false</CrossAccountClone>
@ -264,6 +265,7 @@ class Database(CloudFormationModel):
)
self.dbi_resource_id = "db-M5ENSHXFPU6XHZ4G4ZEI5QIO2U"
self.tags = kwargs.get("tags", [])
self.deletion_protection = kwargs.get("deletion_protection", False)
@property
def db_instance_arn(self):
@ -425,6 +427,7 @@ class Database(CloudFormationModel):
</Tag>
{%- endfor -%}
</TagList>
<DeletionProtection>{{ 'true' if database.deletion_protection else 'false' }}</DeletionProtection>
</DBInstance>"""
)
return template.render(database=self)
@ -1120,6 +1123,10 @@ class RDS2Backend(BaseBackend):
def delete_database(self, db_instance_identifier, db_snapshot_name=None):
if db_instance_identifier in self.databases:
if self.databases[db_instance_identifier].deletion_protection:
raise InvalidParameterValue(
"Can't delete Instance with protection enabled"
)
if db_snapshot_name:
self.create_snapshot(db_instance_identifier, db_snapshot_name)
database = self.databases.pop(db_instance_identifier)
@ -1433,7 +1440,13 @@ class RDS2Backend(BaseBackend):
return self.clusters.values()
def delete_db_cluster(self, cluster_identifier):
return self.clusters.pop(cluster_identifier)
if cluster_identifier in self.clusters:
if self.clusters[cluster_identifier].deletion_protection:
raise InvalidParameterValue(
"Can't delete Cluster with protection enabled"
)
return self.clusters.pop(cluster_identifier)
raise DBClusterNotFoundError(cluster_identifier)
def start_db_cluster(self, cluster_identifier):
if cluster_identifier not in self.clusters:

View File

@ -51,6 +51,7 @@ class RDS2Response(BaseResponse):
"VpcSecurityGroupIds.VpcSecurityGroupId"
),
"tags": list(),
"deletion_protection": self._get_bool_param("DeletionProtection"),
}
args["tags"] = self.unpack_complex_list_params("Tags.Tag", ("Key", "Value"))
return args
@ -93,6 +94,7 @@ class RDS2Response(BaseResponse):
),
"db_name": self._get_param("DatabaseName"),
"db_cluster_identifier": self._get_param("DBClusterIdentifier"),
"deletion_protection": self._get_bool_param("DeletionProtection"),
"engine": self._get_param("Engine"),
"engine_version": self._get_param("EngineVersion"),
"engine_mode": self._get_param("EngineMode"),

View File

@ -39,6 +39,23 @@ def test_create_database():
db_instance["CopyTagsToSnapshot"].should.equal(False)
db_instance["InstanceCreateTime"].should.be.a("datetime.datetime")
db_instance["VpcSecurityGroups"][0]["VpcSecurityGroupId"].should.equal("sg-123456")
db_instance["DeletionProtection"].should.equal(False)
@mock_rds2
def test_database_with_deletion_protection_cannot_be_deleted():
conn = boto3.client("rds", region_name="us-west-2")
database = conn.create_db_instance(
DBInstanceIdentifier="db-master-1",
AllocatedStorage=10,
Engine="postgres",
DBName="staging-postgres",
DBInstanceClass="db.m1.small",
DeletionProtection=True,
)
db_instance = database["DBInstance"]
db_instance["DBInstanceClass"].should.equal("db.m1.small")
db_instance["DeletionProtection"].should.equal(True)
@mock_rds2
@ -302,6 +319,7 @@ def test_get_databases():
MasterUserPassword="hunter2",
Port=1234,
DBSecurityGroups=["my_sg"],
DeletionProtection=True,
)
instances = conn.describe_db_instances()
list(instances["DBInstances"]).should.have.length_of(2)
@ -309,10 +327,14 @@ def test_get_databases():
instances = conn.describe_db_instances(DBInstanceIdentifier="db-master-1")
list(instances["DBInstances"]).should.have.length_of(1)
instances["DBInstances"][0]["DBInstanceIdentifier"].should.equal("db-master-1")
instances["DBInstances"][0]["DeletionProtection"].should.equal(False)
instances["DBInstances"][0]["DBInstanceArn"].should.equal(
"arn:aws:rds:us-west-2:{}:db:db-master-1".format(ACCOUNT_ID)
)
instances = conn.describe_db_instances(DBInstanceIdentifier="db-master-2")
instances["DBInstances"][0]["DeletionProtection"].should.equal(True)
@mock_rds2
def test_get_databases_paginated():
@ -863,6 +885,23 @@ def test_modify_non_existent_option_group():
).should.throw(ClientError, "Specified OptionGroupName: non-existent not found.")
@mock_rds2
def test_delete_database_with_protection():
conn = boto3.client("rds", region_name="us-west-2")
conn.create_db_instance(
DBInstanceIdentifier="db-primary-1",
AllocatedStorage=10,
Engine="postgres",
DBInstanceClass="db.m1.small",
DeletionProtection=True,
)
with pytest.raises(ClientError) as exc:
conn.delete_db_instance(DBInstanceIdentifier="db-primary-1")
err = exc.value.response["Error"]
err["Message"].should.equal("Can't delete Instance with protection enabled")
@mock_rds2
def test_delete_non_existent_database():
conn = boto3.client("rds", region_name="us-west-2")

View File

@ -120,6 +120,7 @@ def test_create_db_cluster__verify_default_properties():
cluster.should.have.key("HttpEndpointEnabled").equal(False)
cluster.should.have.key("CopyTagsToSnapshot").equal(False)
cluster.should.have.key("CrossAccountClone").equal(False)
cluster.should.have.key("DeletionProtection").equal(False)
cluster.should.have.key("DomainMemberships").equal([])
cluster.should.have.key("TagList").equal([])
cluster.should.have.key("ClusterCreateTime")
@ -155,6 +156,7 @@ def test_create_db_cluster_additional_parameters():
MasterUsername="root",
MasterUserPassword="hunter2_",
Port=1234,
DeletionProtection=True,
)
cluster = resp["DBCluster"]
@ -164,6 +166,7 @@ def test_create_db_cluster_additional_parameters():
cluster.should.have.key("EngineVersion").equal("5.6.mysql_aurora.1.19.2")
cluster.should.have.key("EngineMode").equal("serverless")
cluster.should.have.key("Port").equal(1234)
cluster.should.have.key("DeletionProtection").equal(True)
@mock_rds2
@ -207,6 +210,35 @@ def test_delete_db_cluster():
client.describe_db_clusters()["DBClusters"].should.have.length_of(0)
@mock_rds2
def test_delete_db_cluster_that_is_protected():
client = boto3.client("rds", region_name="eu-north-1")
client.create_db_cluster(
DBClusterIdentifier="cluster-id",
Engine="aurora",
MasterUsername="root",
MasterUserPassword="hunter2_",
DeletionProtection=True,
)
with pytest.raises(ClientError) as exc:
client.delete_db_cluster(DBClusterIdentifier="cluster-id")
err = exc.value.response["Error"]
err["Message"].should.equal("Can't delete Cluster with protection enabled")
@mock_rds2
def test_delete_db_cluster_unknown_cluster():
client = boto3.client("rds", region_name="eu-north-1")
with pytest.raises(ClientError) as ex:
client.delete_db_cluster(DBClusterIdentifier="cluster-unknown")
err = ex.value.response["Error"]
err["Code"].should.equal("DBClusterNotFoundFault")
err["Message"].should.equal("DBCluster cluster-unknown not found.")
@mock_rds2
def test_start_db_cluster_unknown_cluster():
client = boto3.client("rds", region_name="eu-north-1")