327 lines
11 KiB
Rust
327 lines
11 KiB
Rust
//! LMDB adapter via the `heed` crate.
|
|
//!
|
|
//! Maps each relation onto a named LMDB sub-database of the same name. A
|
|
//! reserved sub-database named `__meta` carries per-relation metadata (arity
|
|
//! and next synthetic row ID).
|
|
//!
|
|
//! [`LmdbTx`] wraps a real `heed::RwTxn`. Inserts go through the transaction;
|
|
//! [`Transaction::commit`] commits it; dropping the tx without commit lets
|
|
//! `heed` abort the transaction.
|
|
|
|
use std::collections::HashMap;
|
|
|
|
use heed::types::Bytes;
|
|
use heed::{Database, Env, EnvOpenOptions, RwTxn};
|
|
|
|
use crate::codec::{decode_meta, decode_row, encode_meta, encode_row, row_key};
|
|
use crate::id::RowId;
|
|
use crate::value::Value;
|
|
use crate::{CommittedTx, RowStream, Storage, StorageError, Transaction, backend};
|
|
|
|
const META_DB: &str = "__meta";
|
|
const DEFAULT_MAX_DBS: u32 = 128;
|
|
const DEFAULT_MAP_SIZE: usize = 100 * 1024 * 1024;
|
|
|
|
/// LMDB-backed [`Storage`] implementation.
|
|
pub struct LmdbStorage {
|
|
env: Env,
|
|
meta: Database<Bytes, Bytes>,
|
|
}
|
|
|
|
impl LmdbStorage {
|
|
/// Open or create an LMDB environment at `path`.
|
|
///
|
|
/// # Errors
|
|
/// Returns [`StorageError::Backend`] if LMDB fails to open.
|
|
///
|
|
/// # Safety
|
|
/// Uses `EnvOpenOptions::open`, which `heed` marks unsafe because the
|
|
/// memory-mapped file's contents can be modified by other processes,
|
|
/// violating Rust's aliasing rules. This adapter assumes single-process
|
|
/// exclusive access to the path.
|
|
#[allow(unsafe_code)]
|
|
pub fn open(path: impl AsRef<std::path::Path>) -> Result<Self, StorageError> {
|
|
// SAFETY: see method-level doc above.
|
|
let env = unsafe {
|
|
EnvOpenOptions::new()
|
|
.max_dbs(DEFAULT_MAX_DBS)
|
|
.map_size(DEFAULT_MAP_SIZE)
|
|
.open(path)
|
|
.map_err(backend)?
|
|
};
|
|
let mut wtxn = env.write_txn().map_err(backend)?;
|
|
let meta: Database<Bytes, Bytes> = env
|
|
.create_database(&mut wtxn, Some(META_DB))
|
|
.map_err(backend)?;
|
|
wtxn.commit().map_err(backend)?;
|
|
Ok(Self { env, meta })
|
|
}
|
|
}
|
|
|
|
impl Storage for LmdbStorage {
|
|
fn create_relation(&mut self, name: &str, arity: usize) -> Result<(), StorageError> {
|
|
if name == META_DB {
|
|
return Err(StorageError::Validation(format!(
|
|
"relation name '{name}' is reserved"
|
|
)));
|
|
}
|
|
let Ok(arity_u32) = u32::try_from(arity) else {
|
|
unreachable!("arity exceeds u32::MAX")
|
|
};
|
|
let mut wtxn = self.env.write_txn().map_err(backend)?;
|
|
if self
|
|
.meta
|
|
.get(&wtxn, name.as_bytes())
|
|
.map_err(backend)?
|
|
.is_some()
|
|
{
|
|
return Err(StorageError::RelationExists(name.to_string()));
|
|
}
|
|
let encoded = encode_meta(arity_u32, 0);
|
|
self.meta
|
|
.put(&mut wtxn, name.as_bytes(), &encoded[..])
|
|
.map_err(backend)?;
|
|
let _ = self
|
|
.env
|
|
.create_database::<Bytes, Bytes>(&mut wtxn, Some(name))
|
|
.map_err(backend)?;
|
|
wtxn.commit().map_err(backend)?;
|
|
Ok(())
|
|
}
|
|
|
|
fn arity(&self, name: &str) -> Result<usize, StorageError> {
|
|
let rtxn = self.env.read_txn().map_err(backend)?;
|
|
let raw = self
|
|
.meta
|
|
.get(&rtxn, name.as_bytes())
|
|
.map_err(backend)?
|
|
.ok_or_else(|| StorageError::RelationNotFound(name.to_string()))?;
|
|
let (arity, _) = decode_meta(raw)?;
|
|
Ok(arity as usize)
|
|
}
|
|
|
|
fn scan_iter<'a>(&'a self, name: &str) -> Result<RowStream<'a>, StorageError> {
|
|
let rtxn = self.env.read_txn().map_err(backend)?;
|
|
if self
|
|
.meta
|
|
.get(&rtxn, name.as_bytes())
|
|
.map_err(backend)?
|
|
.is_none()
|
|
{
|
|
return Err(StorageError::RelationNotFound(name.to_string()));
|
|
}
|
|
let db: Database<Bytes, Bytes> = self
|
|
.env
|
|
.open_database(&rtxn, Some(name))
|
|
.map_err(backend)?
|
|
.ok_or_else(|| StorageError::RelationNotFound(name.to_string()))?;
|
|
let mut rows = Vec::new();
|
|
for entry in db.iter(&rtxn).map_err(backend)? {
|
|
let (key, value) = entry.map_err(backend)?;
|
|
rows.push((RowId::new(key), decode_row(value)?));
|
|
}
|
|
Ok(Box::new(rows.into_iter().map(Ok)))
|
|
}
|
|
|
|
fn transaction<'a>(&'a mut self) -> Result<Box<dyn Transaction + 'a>, StorageError> {
|
|
let wtxn = self.env.write_txn().map_err(backend)?;
|
|
Ok(Box::new(LmdbTx {
|
|
env: &self.env,
|
|
meta: self.meta,
|
|
wtxn: Some(wtxn),
|
|
dbs: HashMap::new(),
|
|
next_ids: HashMap::new(),
|
|
}))
|
|
}
|
|
}
|
|
|
|
pub(crate) struct LmdbTx<'a> {
|
|
env: &'a Env,
|
|
meta: Database<Bytes, Bytes>,
|
|
wtxn: Option<RwTxn<'a>>,
|
|
/// Per-relation sub-database handles opened within this transaction.
|
|
dbs: HashMap<String, Database<Bytes, Bytes>>,
|
|
next_ids: HashMap<String, (u32, u64)>,
|
|
}
|
|
|
|
impl Transaction for LmdbTx<'_> {
|
|
fn insert(&mut self, name: &str, row: Vec<Value>) -> Result<RowId, StorageError> {
|
|
// Load meta on first access to this relation; subsequent calls within
|
|
// the tx read the cached entry.
|
|
let (arity, next_id) = if let Some(&entry) = self.next_ids.get(name) {
|
|
entry
|
|
} else {
|
|
let Some(wtxn) = self.wtxn.as_ref() else {
|
|
unreachable!("transaction was already committed")
|
|
};
|
|
let raw = self
|
|
.meta
|
|
.get(wtxn, name.as_bytes())
|
|
.map_err(backend)?
|
|
.ok_or_else(|| StorageError::RelationNotFound(name.to_string()))?;
|
|
let entry = decode_meta(raw)?;
|
|
self.next_ids.insert(name.to_string(), entry);
|
|
entry
|
|
};
|
|
if row.len() != arity as usize {
|
|
return Err(StorageError::ArityMismatch {
|
|
expected: arity as usize,
|
|
got: row.len(),
|
|
});
|
|
}
|
|
// Open the per-relation sub-database (cached for subsequent inserts).
|
|
let db = if let Some(&db) = self.dbs.get(name) {
|
|
db
|
|
} else {
|
|
let Some(wtxn) = self.wtxn.as_mut() else {
|
|
unreachable!("transaction was already committed")
|
|
};
|
|
let db = self
|
|
.env
|
|
.create_database::<Bytes, Bytes>(wtxn, Some(name))
|
|
.map_err(backend)?;
|
|
self.dbs.insert(name.to_string(), db);
|
|
db
|
|
};
|
|
let key = row_key(next_id);
|
|
let value = encode_row(&row);
|
|
let Some(wtxn) = self.wtxn.as_mut() else {
|
|
unreachable!("transaction was already committed")
|
|
};
|
|
db.put(wtxn, &key[..], &value[..]).map_err(backend)?;
|
|
self.next_ids.insert(name.to_string(), (arity, next_id + 1));
|
|
Ok(RowId::from(next_id))
|
|
}
|
|
|
|
fn delete(&mut self, name: &str, id: &RowId) -> Result<(), StorageError> {
|
|
// Verify relation existence via meta.
|
|
let Some(wtxn) = self.wtxn.as_ref() else {
|
|
unreachable!("transaction was already committed")
|
|
};
|
|
if self
|
|
.meta
|
|
.get(wtxn, name.as_bytes())
|
|
.map_err(backend)?
|
|
.is_none()
|
|
{
|
|
return Err(StorageError::RelationNotFound(name.to_string()));
|
|
}
|
|
// Open or reuse the per-relation sub-database.
|
|
let db = if let Some(&db) = self.dbs.get(name) {
|
|
db
|
|
} else {
|
|
let Some(wtxn) = self.wtxn.as_mut() else {
|
|
unreachable!("transaction was already committed")
|
|
};
|
|
let db = self
|
|
.env
|
|
.create_database::<Bytes, Bytes>(wtxn, Some(name))
|
|
.map_err(backend)?;
|
|
self.dbs.insert(name.to_string(), db);
|
|
db
|
|
};
|
|
let Some(wtxn) = self.wtxn.as_mut() else {
|
|
unreachable!("transaction was already committed")
|
|
};
|
|
let _ = db.delete(wtxn, id.as_bytes()).map_err(backend)?;
|
|
Ok(())
|
|
}
|
|
|
|
fn commit(self: Box<Self>) -> Result<CommittedTx, StorageError> {
|
|
let mut this = self;
|
|
let Some(mut wtxn) = this.wtxn.take() else {
|
|
unreachable!("transaction was already committed")
|
|
};
|
|
for (name, (arity, next_id)) in this.next_ids.drain() {
|
|
let encoded = encode_meta(arity, next_id);
|
|
this.meta
|
|
.put(&mut wtxn, name.as_bytes(), &encoded[..])
|
|
.map_err(backend)?;
|
|
}
|
|
wtxn.commit().map_err(backend)?;
|
|
Ok(CommittedTx::empty())
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::{LmdbStorage, backend};
|
|
use crate::value::Value;
|
|
use crate::{Storage, StorageError};
|
|
|
|
fn i(x: i64) -> Value {
|
|
Value::Int(x)
|
|
}
|
|
|
|
fn open_temp() -> Result<LmdbStorage, StorageError> {
|
|
let dir = tempfile::tempdir().map_err(backend)?;
|
|
let storage = LmdbStorage::open(dir.path())?;
|
|
std::mem::forget(dir);
|
|
Ok(storage)
|
|
}
|
|
|
|
#[test]
|
|
fn create_insert_scan_roundtrip() -> Result<(), StorageError> {
|
|
let mut storage = open_temp()?;
|
|
storage.create_relation("edge", 2)?;
|
|
let id0 = storage.insert("edge", vec![i(1), i(2)])?;
|
|
let id1 = storage.insert("edge", vec![i(2), i(3)])?;
|
|
let rows = storage.scan("edge")?;
|
|
assert_eq!(rows, vec![(id0, vec![i(1), i(2)]), (id1, vec![i(2), i(3)])]);
|
|
assert_eq!(storage.arity("edge")?, 2);
|
|
Ok(())
|
|
}
|
|
|
|
#[test]
|
|
fn batched_inserts_share_one_commit() -> Result<(), StorageError> {
|
|
let mut storage = open_temp()?;
|
|
storage.create_relation("edge", 2)?;
|
|
let (a, b) = {
|
|
let mut tx = storage.transaction()?;
|
|
let a = tx.insert("edge", vec![i(1), i(2)])?;
|
|
let b = tx.insert("edge", vec![i(3), i(4)])?;
|
|
tx.commit()?;
|
|
(a, b)
|
|
};
|
|
let rows = storage.scan("edge")?;
|
|
assert_eq!(rows, vec![(a, vec![i(1), i(2)]), (b, vec![i(3), i(4)])]);
|
|
Ok(())
|
|
}
|
|
|
|
#[test]
|
|
fn dropped_transaction_is_rolled_back() -> Result<(), StorageError> {
|
|
let mut storage = open_temp()?;
|
|
storage.create_relation("edge", 2)?;
|
|
{
|
|
let mut tx = storage.transaction()?;
|
|
tx.insert("edge", vec![i(1), i(2)])?;
|
|
}
|
|
assert!(storage.scan("edge")?.is_empty());
|
|
Ok(())
|
|
}
|
|
|
|
#[test]
|
|
fn delete_removes_row() -> Result<(), StorageError> {
|
|
let mut storage = open_temp()?;
|
|
storage.create_relation("edge", 1)?;
|
|
let a = storage.insert("edge", vec![i(1)])?;
|
|
let b = storage.insert("edge", vec![i(2)])?;
|
|
storage.delete("edge", &a)?;
|
|
let rows = storage.scan("edge")?;
|
|
assert_eq!(rows, vec![(b, vec![i(2)])]);
|
|
storage.delete("edge", &a)?;
|
|
Ok(())
|
|
}
|
|
|
|
#[test]
|
|
fn duplicate_create_returns_err() -> Result<(), StorageError> {
|
|
let mut storage = open_temp()?;
|
|
storage.create_relation("edge", 2)?;
|
|
assert!(matches!(
|
|
storage.create_relation("edge", 2),
|
|
Err(StorageError::RelationExists(_))
|
|
));
|
|
Ok(())
|
|
}
|
|
}
|