//! redb adapter. //! //! Each relation gets a redb table named after it, keyed by `u64` row IDs. //! A reserved table named `__meta`, keyed by relation name, carries per-relation //! metadata (arity and next synthetic row ID). //! //! [`RedbTx`] wraps a real `redb::WriteTransaction`. Inserts go through the //! transaction; [`Transaction::commit`] commits it; dropping the tx without //! commit rolls back (redb's `WriteTransaction` drops the pending writes). use std::collections::HashMap; use redb::{Database, ReadableTable, TableDefinition, WriteTransaction}; use crate::codec::{decode_meta, decode_row, encode_meta, encode_row}; use crate::id::RowId; use crate::value::Value; use crate::{backend, CommittedTx, RowStream, Storage, StorageError, Transaction}; const META_TABLE: &str = "__meta"; fn meta_def() -> TableDefinition<'static, &'static str, &'static [u8]> { TableDefinition::new(META_TABLE) } fn rows_def(name: &str) -> TableDefinition<'_, u64, &'static [u8]> { TableDefinition::new(name) } /// redb-backed [`Storage`] implementation. pub struct RedbStorage { db: Database, } impl RedbStorage { /// Open or create a redb database at `path`. /// /// # Errors /// Returns [`StorageError::Backend`] if redb fails to open the file. pub fn open(path: impl AsRef) -> Result { let db = Database::create(path).map_err(backend)?; Ok(Self { db }) } } impl Storage for RedbStorage { fn create_relation(&mut self, name: &str, arity: usize) -> Result<(), StorageError> { if name == META_TABLE { return Err(StorageError::Validation(format!( "relation name '{name}' is reserved" ))); } let Ok(arity_u32) = u32::try_from(arity) else { unreachable!("arity exceeds u32::MAX") }; let txn = self.db.begin_write().map_err(backend)?; { let mut meta = txn.open_table(meta_def()).map_err(backend)?; if meta.get(name).map_err(backend)?.is_some() { return Err(StorageError::RelationExists(name.to_string())); } let encoded = encode_meta(arity_u32, 0); meta.insert(name, &encoded[..]).map_err(backend)?; let _ = txn.open_table(rows_def(name)).map_err(backend)?; } txn.commit().map_err(backend)?; Ok(()) } fn arity(&self, name: &str) -> Result { let txn = self.db.begin_read().map_err(backend)?; let meta = txn.open_table(meta_def()).map_err(backend)?; let entry = meta .get(name) .map_err(backend)? .ok_or_else(|| StorageError::RelationNotFound(name.to_string()))?; let (arity, _) = decode_meta(entry.value())?; Ok(arity as usize) } fn scan_iter<'a>(&'a self, name: &str) -> Result, StorageError> { let txn = self.db.begin_read().map_err(backend)?; let meta = txn.open_table(meta_def()).map_err(backend)?; if meta.get(name).map_err(backend)?.is_none() { return Err(StorageError::RelationNotFound(name.to_string())); } let table = txn.open_table(rows_def(name)).map_err(backend)?; let mut rows = Vec::new(); for entry in table.iter().map_err(backend)? { let (key, value) = entry.map_err(backend)?; let id = RowId::from(key.value()); rows.push((id, decode_row(value.value())?)); } Ok(Box::new(rows.into_iter().map(Ok))) } fn transaction<'a>(&'a mut self) -> Result, StorageError> { let wtxn = self.db.begin_write().map_err(backend)?; Ok(Box::new(RedbTx { wtxn: Some(wtxn), next_ids: HashMap::new(), })) } } pub(crate) struct RedbTx { wtxn: Option, next_ids: HashMap, } impl RedbTx { /// Borrow the live `WriteTransaction`. Panics if commit already /// consumed it: unreachable via the public API since /// [`Transaction::commit`] consumes the boxed tx. fn live(&self) -> &WriteTransaction { match self.wtxn.as_ref() { Some(t) => t, None => unreachable!("transaction was already committed"), } } fn meta_for(&mut self, name: &str) -> Result<(u32, u64), StorageError> { if let Some(&entry) = self.next_ids.get(name) { return Ok(entry); } let decoded = { let meta = self.live().open_table(meta_def()).map_err(backend)?; let entry = meta .get(name) .map_err(backend)? .ok_or_else(|| StorageError::RelationNotFound(name.to_string()))?; decode_meta(entry.value())? }; self.next_ids.insert(name.to_string(), decoded); Ok(decoded) } } fn row_id_as_u64(id: &RowId) -> Result { let bytes = id.as_bytes(); if bytes.len() != 8 { return Err(StorageError::Backend( format!("redb row id must be 8 bytes, got {}", bytes.len()).into(), )); } let mut buf = [0u8; 8]; buf.copy_from_slice(bytes); Ok(u64::from_be_bytes(buf)) } impl Transaction for RedbTx { fn insert(&mut self, name: &str, row: Vec) -> Result { let (arity, next_id) = self.meta_for(name)?; if row.len() != arity as usize { return Err(StorageError::ArityMismatch { expected: arity as usize, got: row.len(), }); } { let mut rows = self.live().open_table(rows_def(name)).map_err(backend)?; let encoded = encode_row(&row); rows.insert(next_id, &encoded[..]).map_err(backend)?; } self.next_ids.insert(name.to_string(), (arity, next_id + 1)); Ok(RowId::from(next_id)) } fn delete(&mut self, name: &str, id: &RowId) -> Result<(), StorageError> { let key = row_id_as_u64(id)?; let wtxn = self.live(); // Verify the relation exists by checking meta. let meta = wtxn.open_table(meta_def()).map_err(backend)?; if meta.get(name).map_err(backend)?.is_none() { return Err(StorageError::RelationNotFound(name.to_string())); } drop(meta); let mut rows = wtxn.open_table(rows_def(name)).map_err(backend)?; let _ = rows.remove(key).map_err(backend)?; Ok(()) } fn commit(self: Box) -> Result { let mut this = self; let Some(wtxn) = this.wtxn.take() else { unreachable!("transaction was already committed") }; { let mut meta = wtxn.open_table(meta_def()).map_err(backend)?; for (name, (arity, next_id)) in this.next_ids.drain() { let encoded = encode_meta(arity, next_id); meta.insert(name.as_str(), &encoded[..]).map_err(backend)?; } } wtxn.commit().map_err(backend)?; Ok(CommittedTx::empty()) } } #[cfg(test)] mod tests { use super::{backend, RedbStorage}; use crate::value::Value; use crate::{Storage, StorageError}; fn i(x: i64) -> Value { Value::Int(x) } fn s(x: &str) -> Value { Value::Str(x.to_string()) } fn open_temp() -> Result { let dir = tempfile::tempdir().map_err(backend)?; let path = dir.path().join("test.redb"); let storage = RedbStorage::open(&path)?; std::mem::forget(dir); Ok(storage) } #[test] fn create_insert_scan_roundtrip() -> Result<(), StorageError> { let mut storage = open_temp()?; storage.create_relation("edge", 2)?; let id0 = storage.insert("edge", vec![i(1), i(2)])?; let id1 = storage.insert("edge", vec![s("hello"), i(7)])?; let rows = storage.scan("edge")?; assert_eq!( rows, vec![(id0, vec![i(1), i(2)]), (id1, vec![s("hello"), i(7)])], ); assert_eq!(storage.arity("edge")?, 2); Ok(()) } #[test] fn batched_inserts_share_one_commit() -> Result<(), StorageError> { let mut storage = open_temp()?; storage.create_relation("edge", 2)?; let (a, b) = { let mut tx = storage.transaction()?; let a = tx.insert("edge", vec![i(1), i(2)])?; let b = tx.insert("edge", vec![i(3), i(4)])?; tx.commit()?; (a, b) }; let rows = storage.scan("edge")?; assert_eq!(rows, vec![(a, vec![i(1), i(2)]), (b, vec![i(3), i(4)])]); Ok(()) } #[test] fn dropped_transaction_is_rolled_back() -> Result<(), StorageError> { let mut storage = open_temp()?; storage.create_relation("edge", 2)?; { let mut tx = storage.transaction()?; tx.insert("edge", vec![i(1), i(2)])?; } assert!(storage.scan("edge")?.is_empty()); Ok(()) } #[test] fn delete_removes_row() -> Result<(), StorageError> { let mut storage = open_temp()?; storage.create_relation("edge", 1)?; let a = storage.insert("edge", vec![i(1)])?; let b = storage.insert("edge", vec![i(2)])?; storage.delete("edge", &a)?; let rows = storage.scan("edge")?; assert_eq!(rows, vec![(b, vec![i(2)])]); storage.delete("edge", &a)?; Ok(()) } #[test] fn duplicate_create_returns_err() -> Result<(), StorageError> { let mut storage = open_temp()?; storage.create_relation("edge", 2)?; assert!(matches!( storage.create_relation("edge", 2), Err(StorageError::RelationExists(_)) )); Ok(()) } #[test] fn insert_wrong_arity_returns_err() -> Result<(), StorageError> { let mut storage = open_temp()?; storage.create_relation("edge", 2)?; assert!(matches!( storage.insert("edge", vec![i(1)]), Err(StorageError::ArityMismatch { expected: 2, got: 1, }) )); Ok(()) } }