RDS: Allow copying snapshots of deleted DB instances (#7392)

This commit is contained in:
Oran Avraham 2024-02-25 23:20:32 +02:00 committed by GitHub
parent 14b3db77b9
commit dbfa456dda
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 15 additions and 6 deletions

View File

@ -1782,14 +1782,18 @@ class RDSBackend(BaseBackend):
def create_db_snapshot( def create_db_snapshot(
self, self,
db_instance_identifier: str, db_instance: Union[str, Database],
db_snapshot_identifier: str, db_snapshot_identifier: str,
snapshot_type: str = "manual", snapshot_type: str = "manual",
tags: Optional[List[Dict[str, str]]] = None, tags: Optional[List[Dict[str, str]]] = None,
) -> DatabaseSnapshot: ) -> DatabaseSnapshot:
database = self.databases.get(db_instance_identifier) if isinstance(db_instance, str):
if not database: database = self.databases.get(db_instance)
raise DBInstanceNotFoundError(db_instance_identifier) if not database:
raise DBInstanceNotFoundError(db_instance)
else:
database = db_instance
if db_snapshot_identifier in self.database_snapshots: if db_snapshot_identifier in self.database_snapshots:
raise DBSnapshotAlreadyExistsError(db_snapshot_identifier) raise DBSnapshotAlreadyExistsError(db_snapshot_identifier)
if len(self.database_snapshots) >= int( if len(self.database_snapshots) >= int(
@ -1821,7 +1825,7 @@ class RDSBackend(BaseBackend):
else: else:
tags = self._merge_tags(source_snapshot.tags, tags) tags = self._merge_tags(source_snapshot.tags, tags)
return self.create_db_snapshot( return self.create_db_snapshot(
db_instance_identifier=source_snapshot.database.db_instance_identifier, # type: ignore db_instance=source_snapshot.database,
db_snapshot_identifier=target_snapshot_identifier, db_snapshot_identifier=target_snapshot_identifier,
tags=tags, tags=tags,
) )

View File

@ -1039,8 +1039,9 @@ def test_create_db_snapshots_with_tags():
] ]
@pytest.mark.parametrize("delete_db_instance", [True, False])
@mock_aws @mock_aws
def test_copy_db_snapshots(): def test_copy_db_snapshots(delete_db_instance: bool):
conn = boto3.client("rds", region_name=DEFAULT_REGION) conn = boto3.client("rds", region_name=DEFAULT_REGION)
conn.create_db_instance( conn.create_db_instance(
@ -1059,6 +1060,10 @@ def test_copy_db_snapshots():
DBInstanceIdentifier="db-primary-1", DBSnapshotIdentifier="snapshot-1" DBInstanceIdentifier="db-primary-1", DBSnapshotIdentifier="snapshot-1"
).get("DBSnapshot") ).get("DBSnapshot")
if delete_db_instance:
# Delete the original instance, but the copy snapshot operation should still succeed.
conn.delete_db_instance(DBInstanceIdentifier="db-primary-1")
target_snapshot = conn.copy_db_snapshot( target_snapshot = conn.copy_db_snapshot(
SourceDBSnapshotIdentifier="snapshot-1", TargetDBSnapshotIdentifier="snapshot-2" SourceDBSnapshotIdentifier="snapshot-1", TargetDBSnapshotIdentifier="snapshot-2"
).get("DBSnapshot") ).get("DBSnapshot")