//! fjall adapter. //! //! Each relation gets a fjall [`PartitionHandle`](fjall::PartitionHandle) of //! the same name. A reserved partition named `__meta` carries per-relation //! metadata (arity and next synthetic row ID). //! //! fjall has no native cross-partition write transactions, so the adapter //! buffers inserts inside [`FjallTx`] and applies them when //! [`Transaction::commit`] is called; dropping the tx without commit discards //! the buffer. use std::collections::HashMap; use fjall::{Keyspace, PartitionCreateOptions, PartitionHandle}; use crate::codec::{decode_meta, decode_row, encode_meta, encode_row}; use crate::id::RowId; use crate::value::Value; use crate::{CommittedTx, RowStream, Storage, StorageError, Transaction, backend}; const META_PARTITION: &str = "__meta"; /// fjall-backed [`Storage`] implementation. pub struct FjallStorage { keyspace: Keyspace, meta: PartitionHandle, } impl FjallStorage { /// Open or create a fjall keyspace at `path`. /// /// # Errors /// Returns [`StorageError::Backend`] if fjall fails to open the path. pub fn open(path: impl AsRef) -> Result { let keyspace = fjall::Config::new(path).open().map_err(backend)?; let meta = keyspace .open_partition(META_PARTITION, PartitionCreateOptions::default()) .map_err(backend)?; Ok(Self { keyspace, meta }) } fn relation_partition(&self, name: &str) -> Result { self.keyspace .open_partition(name, PartitionCreateOptions::default()) .map_err(backend) } } impl Storage for FjallStorage { fn create_relation(&mut self, name: &str, arity: usize) -> Result<(), StorageError> { if name == META_PARTITION { return Err(StorageError::Validation(format!( "relation name '{name}' is reserved" ))); } if self.meta.contains_key(name.as_bytes()).map_err(backend)? { return Err(StorageError::RelationExists(name.to_string())); } let Ok(arity_u32) = u32::try_from(arity) else { unreachable!("arity exceeds u32::MAX") }; self.meta .insert(name.as_bytes(), encode_meta(arity_u32, 0)) .map_err(backend)?; let _ = self.relation_partition(name)?; Ok(()) } fn arity(&self, name: &str) -> Result { let raw = self .meta .get(name.as_bytes()) .map_err(backend)? .ok_or_else(|| StorageError::RelationNotFound(name.to_string()))?; let (arity, _) = decode_meta(raw.as_ref())?; Ok(arity as usize) } fn scan_iter<'a>(&'a self, name: &str) -> Result, StorageError> { let _ = self.arity(name)?; let partition = self.relation_partition(name)?; let iter = partition.iter().map(|res| { let (key, value) = res.map_err(backend)?; Ok((RowId::new(key.as_ref()), decode_row(value.as_ref())?)) }); Ok(Box::new(iter)) } fn transaction<'a>(&'a mut self) -> Result, StorageError> { Ok(Box::new(FjallTx { keyspace: &self.keyspace, meta: &self.meta, pending: Vec::new(), deletes: Vec::new(), next_ids: HashMap::new(), })) } } pub(crate) struct FjallTx<'a> { keyspace: &'a Keyspace, meta: &'a PartitionHandle, pending: Vec<(String, RowId, Vec)>, deletes: Vec<(String, RowId)>, next_ids: HashMap, } impl FjallTx<'_> { fn meta_for(&mut self, name: &str) -> Result<(u32, u64), StorageError> { if let Some(&entry) = self.next_ids.get(name) { return Ok(entry); } let raw = self .meta .get(name.as_bytes()) .map_err(backend)? .ok_or_else(|| StorageError::RelationNotFound(name.to_string()))?; let entry = decode_meta(raw.as_ref())?; self.next_ids.insert(name.to_string(), entry); Ok(entry) } } impl Transaction for FjallTx<'_> { fn insert(&mut self, name: &str, row: Vec) -> Result { let (arity, next_id) = self.meta_for(name)?; if row.len() != arity as usize { return Err(StorageError::ArityMismatch { expected: arity as usize, got: row.len(), }); } let id = RowId::from(next_id); self.next_ids.insert(name.to_string(), (arity, next_id + 1)); self.pending.push((name.to_string(), id.clone(), row)); Ok(id) } fn delete(&mut self, name: &str, id: &RowId) -> Result<(), StorageError> { if !self.meta.contains_key(name.as_bytes()).map_err(backend)? { return Err(StorageError::RelationNotFound(name.to_string())); } self.deletes.push((name.to_string(), id.clone())); Ok(()) } fn commit(self: Box) -> Result { let FjallTx { keyspace, meta, pending, deletes, next_ids, } = *self; for (name, id, row) in pending { let partition = keyspace .open_partition(&name, PartitionCreateOptions::default()) .map_err(backend)?; partition .insert(id.as_bytes(), encode_row(&row)) .map_err(backend)?; } for (name, id) in deletes { let partition = keyspace .open_partition(&name, PartitionCreateOptions::default()) .map_err(backend)?; partition.remove(id.as_bytes()).map_err(backend)?; } for (name, (arity, next_id)) in next_ids { meta.insert(name.as_bytes(), encode_meta(arity, next_id)) .map_err(backend)?; } Ok(CommittedTx::empty()) } } #[cfg(test)] mod tests { use super::{FjallStorage, backend}; use crate::value::Value; use crate::{Storage, StorageError}; fn i(x: i64) -> Value { Value::Int(x) } fn open_temp() -> Result { let dir = tempfile::tempdir().map_err(backend)?; let storage = FjallStorage::open(dir.path())?; std::mem::forget(dir); Ok(storage) } #[test] fn create_insert_scan_roundtrip() -> Result<(), StorageError> { let mut storage = open_temp()?; storage.create_relation("edge", 2)?; let id0 = storage.insert("edge", vec![i(1), i(2)])?; let id1 = storage.insert("edge", vec![i(2), i(3)])?; let rows = storage.scan("edge")?; assert_eq!(rows, vec![(id0, vec![i(1), i(2)]), (id1, vec![i(2), i(3)])]); assert_eq!(storage.arity("edge")?, 2); Ok(()) } #[test] fn batched_inserts_share_one_commit() -> Result<(), StorageError> { let mut storage = open_temp()?; storage.create_relation("edge", 2)?; let (a, b) = { let mut tx = storage.transaction()?; let a = tx.insert("edge", vec![i(1), i(2)])?; let b = tx.insert("edge", vec![i(3), i(4)])?; tx.commit()?; (a, b) }; let rows = storage.scan("edge")?; assert_eq!(rows, vec![(a, vec![i(1), i(2)]), (b, vec![i(3), i(4)])]); Ok(()) } #[test] fn dropped_transaction_is_rolled_back() -> Result<(), StorageError> { let mut storage = open_temp()?; storage.create_relation("edge", 2)?; { let mut tx = storage.transaction()?; tx.insert("edge", vec![i(1), i(2)])?; } assert!(storage.scan("edge")?.is_empty()); Ok(()) } #[test] fn delete_removes_row() -> Result<(), StorageError> { let mut storage = open_temp()?; storage.create_relation("edge", 1)?; let a = storage.insert("edge", vec![i(1)])?; let b = storage.insert("edge", vec![i(2)])?; storage.delete("edge", &a)?; let rows = storage.scan("edge")?; assert_eq!(rows, vec![(b, vec![i(2)])]); storage.delete("edge", &a)?; Ok(()) } #[test] fn duplicate_create_returns_err() -> Result<(), StorageError> { let mut storage = open_temp()?; storage.create_relation("edge", 2)?; assert!(matches!( storage.create_relation("edge", 2), Err(StorageError::RelationExists(_)) )); Ok(()) } }