//! LMDB adapter via the `heed` crate. //! //! Maps each relation onto a named LMDB sub-database of the same name. A //! reserved sub-database named `__meta` carries per-relation metadata (arity //! and next synthetic row ID). //! //! [`LmdbTx`] wraps a real `heed::RwTxn`. Inserts go through the transaction; //! [`Transaction::commit`] commits it; dropping the tx without commit lets //! `heed` abort the transaction. use std::collections::HashMap; use heed::types::Bytes; use heed::{Database, Env, EnvOpenOptions, RwTxn}; use crate::codec::{decode_meta, decode_row, encode_meta, encode_row, row_key}; use crate::id::RowId; use crate::value::Value; use crate::{CommittedTx, RowStream, Storage, StorageError, Transaction, backend}; const META_DB: &str = "__meta"; const DEFAULT_MAX_DBS: u32 = 128; const DEFAULT_MAP_SIZE: usize = 100 * 1024 * 1024; /// LMDB-backed [`Storage`] implementation. pub struct LmdbStorage { env: Env, meta: Database, } impl LmdbStorage { /// Open or create an LMDB environment at `path`. /// /// # Errors /// Returns [`StorageError::Backend`] if LMDB fails to open. /// /// # Safety /// Uses `EnvOpenOptions::open`, which `heed` marks unsafe because the /// memory-mapped file's contents can be modified by other processes, /// violating Rust's aliasing rules. This adapter assumes single-process /// exclusive access to the path. #[allow(unsafe_code)] pub fn open(path: impl AsRef) -> Result { // SAFETY: see method-level doc above. let env = unsafe { EnvOpenOptions::new() .max_dbs(DEFAULT_MAX_DBS) .map_size(DEFAULT_MAP_SIZE) .open(path) .map_err(backend)? }; let mut wtxn = env.write_txn().map_err(backend)?; let meta: Database = env .create_database(&mut wtxn, Some(META_DB)) .map_err(backend)?; wtxn.commit().map_err(backend)?; Ok(Self { env, meta }) } } impl Storage for LmdbStorage { fn create_relation(&mut self, name: &str, arity: usize) -> Result<(), StorageError> { if name == META_DB { return Err(StorageError::Validation(format!( "relation name '{name}' is reserved" ))); } let Ok(arity_u32) = u32::try_from(arity) else { unreachable!("arity exceeds u32::MAX") }; let mut wtxn = self.env.write_txn().map_err(backend)?; if self .meta .get(&wtxn, name.as_bytes()) .map_err(backend)? .is_some() { return Err(StorageError::RelationExists(name.to_string())); } let encoded = encode_meta(arity_u32, 0); self.meta .put(&mut wtxn, name.as_bytes(), &encoded[..]) .map_err(backend)?; let _ = self .env .create_database::(&mut wtxn, Some(name)) .map_err(backend)?; wtxn.commit().map_err(backend)?; Ok(()) } fn arity(&self, name: &str) -> Result { let rtxn = self.env.read_txn().map_err(backend)?; let raw = self .meta .get(&rtxn, name.as_bytes()) .map_err(backend)? .ok_or_else(|| StorageError::RelationNotFound(name.to_string()))?; let (arity, _) = decode_meta(raw)?; Ok(arity as usize) } fn scan_iter<'a>(&'a self, name: &str) -> Result, StorageError> { let rtxn = self.env.read_txn().map_err(backend)?; if self .meta .get(&rtxn, name.as_bytes()) .map_err(backend)? .is_none() { return Err(StorageError::RelationNotFound(name.to_string())); } let db: Database = self .env .open_database(&rtxn, Some(name)) .map_err(backend)? .ok_or_else(|| StorageError::RelationNotFound(name.to_string()))?; let mut rows = Vec::new(); for entry in db.iter(&rtxn).map_err(backend)? { let (key, value) = entry.map_err(backend)?; rows.push((RowId::new(key), decode_row(value)?)); } Ok(Box::new(rows.into_iter().map(Ok))) } fn transaction<'a>(&'a mut self) -> Result, StorageError> { let wtxn = self.env.write_txn().map_err(backend)?; Ok(Box::new(LmdbTx { env: &self.env, meta: self.meta, wtxn: Some(wtxn), dbs: HashMap::new(), next_ids: HashMap::new(), })) } } pub(crate) struct LmdbTx<'a> { env: &'a Env, meta: Database, wtxn: Option>, /// Per-relation sub-database handles opened within this transaction. dbs: HashMap>, next_ids: HashMap, } impl Transaction for LmdbTx<'_> { fn insert(&mut self, name: &str, row: Vec) -> Result { // Load meta on first access to this relation; subsequent calls within // the tx read the cached entry. let (arity, next_id) = if let Some(&entry) = self.next_ids.get(name) { entry } else { let Some(wtxn) = self.wtxn.as_ref() else { unreachable!("transaction was already committed") }; let encoded = self .meta .get(wtxn, name.as_bytes()) .map_err(backend)? .ok_or_else(|| StorageError::RelationNotFound(name.to_string()))?; let entry = decode_meta(encoded)?; self.next_ids.insert(name.to_string(), entry); entry }; if row.len() != arity as usize { return Err(StorageError::ArityMismatch { expected: arity as usize, got: row.len(), }); } // Open the per-relation sub-database (cached for subsequent inserts). let db = if let Some(&db) = self.dbs.get(name) { db } else { let Some(wtxn) = self.wtxn.as_mut() else { unreachable!("transaction was already committed") }; let db = self .env .create_database::(wtxn, Some(name)) .map_err(backend)?; self.dbs.insert(name.to_string(), db); db }; let key = row_key(next_id); let value = encode_row(&row); let Some(wtxn) = self.wtxn.as_mut() else { unreachable!("transaction was already committed") }; db.put(wtxn, &key[..], &value[..]).map_err(backend)?; self.next_ids.insert(name.to_string(), (arity, next_id + 1)); Ok(RowId::from(next_id)) } fn delete(&mut self, name: &str, id: &RowId) -> Result<(), StorageError> { // Verify relation existence via meta. let Some(wtxn) = self.wtxn.as_ref() else { unreachable!("transaction was already committed") }; if self .meta .get(wtxn, name.as_bytes()) .map_err(backend)? .is_none() { return Err(StorageError::RelationNotFound(name.to_string())); } // Open or reuse the per-relation sub-database. let db = if let Some(&db) = self.dbs.get(name) { db } else { let Some(wtxn) = self.wtxn.as_mut() else { unreachable!("transaction was already committed") }; let db = self .env .create_database::(wtxn, Some(name)) .map_err(backend)?; self.dbs.insert(name.to_string(), db); db }; let Some(wtxn) = self.wtxn.as_mut() else { unreachable!("transaction was already committed") }; let _ = db.delete(wtxn, id.as_bytes()).map_err(backend)?; Ok(()) } fn commit(self: Box) -> Result { let mut this = self; let Some(mut wtxn) = this.wtxn.take() else { unreachable!("transaction was already committed") }; for (name, (arity, next_id)) in this.next_ids.drain() { let encoded = encode_meta(arity, next_id); this.meta .put(&mut wtxn, name.as_bytes(), &encoded[..]) .map_err(backend)?; } wtxn.commit().map_err(backend)?; Ok(CommittedTx::empty()) } } #[cfg(test)] mod tests { use super::{LmdbStorage, backend}; use crate::value::Value; use crate::{Storage, StorageError}; fn i(x: i64) -> Value { Value::Int(x) } fn open_temp() -> Result { let dir = tempfile::tempdir().map_err(backend)?; let storage = LmdbStorage::open(dir.path())?; std::mem::forget(dir); Ok(storage) } #[test] fn create_insert_scan_roundtrip() -> Result<(), StorageError> { let mut storage = open_temp()?; storage.create_relation("edge", 2)?; let id0 = storage.insert("edge", vec![i(1), i(2)])?; let id1 = storage.insert("edge", vec![i(2), i(3)])?; let rows = storage.scan("edge")?; assert_eq!(rows, vec![(id0, vec![i(1), i(2)]), (id1, vec![i(2), i(3)])]); assert_eq!(storage.arity("edge")?, 2); Ok(()) } #[test] fn batched_inserts_share_one_commit() -> Result<(), StorageError> { let mut storage = open_temp()?; storage.create_relation("edge", 2)?; let (a, b) = { let mut tx = storage.transaction()?; let a = tx.insert("edge", vec![i(1), i(2)])?; let b = tx.insert("edge", vec![i(3), i(4)])?; tx.commit()?; (a, b) }; let rows = storage.scan("edge")?; assert_eq!(rows, vec![(a, vec![i(1), i(2)]), (b, vec![i(3), i(4)])]); Ok(()) } #[test] fn dropped_transaction_is_rolled_back() -> Result<(), StorageError> { let mut storage = open_temp()?; storage.create_relation("edge", 2)?; { let mut tx = storage.transaction()?; tx.insert("edge", vec![i(1), i(2)])?; } assert!(storage.scan("edge")?.is_empty()); Ok(()) } #[test] fn delete_removes_row() -> Result<(), StorageError> { let mut storage = open_temp()?; storage.create_relation("edge", 1)?; let a = storage.insert("edge", vec![i(1)])?; let b = storage.insert("edge", vec![i(2)])?; storage.delete("edge", &a)?; let rows = storage.scan("edge")?; assert_eq!(rows, vec![(b, vec![i(2)])]); storage.delete("edge", &a)?; Ok(()) } #[test] fn duplicate_create_returns_err() -> Result<(), StorageError> { let mut storage = open_temp()?; storage.create_relation("edge", 2)?; assert!(matches!( storage.create_relation("edge", 2), Err(StorageError::RelationExists(_)) )); Ok(()) } }