From dbfa456dda04f995479c995a361b60f521e1753e Mon Sep 17 00:00:00 2001 From: Oran Avraham <252748+oranav@users.noreply.github.com> Date: Sun, 25 Feb 2024 23:20:32 +0200 Subject: [PATCH] RDS: Allow copying snapshots of deleted DB instances (#7392) --- moto/rds/models.py | 14 +++++++++----- tests/test_rds/test_rds.py | 7 ++++++- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/moto/rds/models.py b/moto/rds/models.py index 18ffab32f..ace7eaf05 100644 --- a/moto/rds/models.py +++ b/moto/rds/models.py @@ -1782,14 +1782,18 @@ class RDSBackend(BaseBackend): def create_db_snapshot( self, - db_instance_identifier: str, + db_instance: Union[str, Database], db_snapshot_identifier: str, snapshot_type: str = "manual", tags: Optional[List[Dict[str, str]]] = None, ) -> DatabaseSnapshot: - database = self.databases.get(db_instance_identifier) - if not database: - raise DBInstanceNotFoundError(db_instance_identifier) + if isinstance(db_instance, str): + database = self.databases.get(db_instance) + if not database: + raise DBInstanceNotFoundError(db_instance) + else: + database = db_instance + if db_snapshot_identifier in self.database_snapshots: raise DBSnapshotAlreadyExistsError(db_snapshot_identifier) if len(self.database_snapshots) >= int( @@ -1821,7 +1825,7 @@ class RDSBackend(BaseBackend): else: tags = self._merge_tags(source_snapshot.tags, tags) return self.create_db_snapshot( - db_instance_identifier=source_snapshot.database.db_instance_identifier, # type: ignore + db_instance=source_snapshot.database, db_snapshot_identifier=target_snapshot_identifier, tags=tags, ) diff --git a/tests/test_rds/test_rds.py b/tests/test_rds/test_rds.py index 4f0aa02ae..bf06b5645 100644 --- a/tests/test_rds/test_rds.py +++ b/tests/test_rds/test_rds.py @@ -1039,8 +1039,9 @@ def test_create_db_snapshots_with_tags(): ] +@pytest.mark.parametrize("delete_db_instance", [True, False]) @mock_aws -def test_copy_db_snapshots(): +def test_copy_db_snapshots(delete_db_instance: bool): conn = boto3.client("rds", region_name=DEFAULT_REGION) conn.create_db_instance( @@ -1059,6 +1060,10 @@ def test_copy_db_snapshots(): DBInstanceIdentifier="db-primary-1", DBSnapshotIdentifier="snapshot-1" ).get("DBSnapshot") + if delete_db_instance: + # Delete the original instance, but the copy snapshot operation should still succeed. + conn.delete_db_instance(DBInstanceIdentifier="db-primary-1") + target_snapshot = conn.copy_db_snapshot( SourceDBSnapshotIdentifier="snapshot-1", TargetDBSnapshotIdentifier="snapshot-2" ).get("DBSnapshot")