//! Geomerge adapter. //! //! Geomerge schemas are immutable after store construction: there is no //! public API to register a new table on a live `Store`. The adapter expects //! all relations to be declared up front via a `FlatTheory` passed to //! [`GeomergeStorage::from_theory`] (or a pre-built `Store` via //! [`GeomergeStorage::from_store`]). [`Storage::create_relation`] is then a //! verifier that the relation exists and that its arity matches. //! //! ## Deletion //! //! Geomerge upstream's `txn::ops::Op` enum currently has only `Op::Add`; //! there is no retract or remove op. The adapter therefore uses the default //! [`Transaction::delete`] implementation, which returns //! [`StorageError::Backend`]. //! //! ## Row Identifier Encoding //! //! Geomerge's [`RowId`](geomerge::table::RowId) is `{ commit: CommitHash, counter: u32 }`. //! The adapter uses two encodings under [`crate::id::RowId`]: //! //! - **Existing (36 bytes):** 32-byte commit hash followed by a 4-byte BE //! counter. Returned by [`Storage::scan`] and stable across calls. //! - **Pending (4 bytes):** just a 4-byte BE counter, returned by //! [`Transaction::insert`] to reference an in-flight row from later //! inserts in the same transaction. Pending ids become invalid after //! commit; post-commit identifiers must be looked up via //! [`Storage::scan`]. //! //! Foreign-key references inside a transaction work because geomerge's //! `TxnCellValue::Id(RowRef::Pending(TempRowId(counter)))` accepts the //! counter the adapter tracked locally. The local counter is assumed to //! match geomerge's internal `TempRowId` counter (both start at 0 and //! increment per `Transaction::add`). use std::collections::{HashMap, HashSet}; use geomerge::commit::hash::CommitHash; use geomerge::ir::{self, Path}; use geomerge::store::Store; use geomerge::table::{CellValue, RowId as GmRowId}; use geomerge::txn::ops::{RowRef, TempRowId, TxnCellValue}; use crate::id::RowId; use crate::value::Value; use crate::{CommittedTx, RowStream, Storage, StorageError, Transaction, backend}; const GM_ROW_ID_LEN: usize = 32 + 4; const PENDING_ROW_ID_LEN: usize = 4; fn validation(msg: impl Into) -> StorageError { StorageError::Validation(msg.into()) } fn encode_gm_row_id(id: &GmRowId) -> RowId { let mut bytes = [0u8; GM_ROW_ID_LEN]; bytes[..32].copy_from_slice(&id.commit.0); bytes[32..].copy_from_slice(&id.counter.to_be_bytes()); RowId::new(bytes) } fn decode_gm_row_id(bytes: &[u8]) -> Result { if bytes.len() != GM_ROW_ID_LEN { return Err(validation(format!( "expected {GM_ROW_ID_LEN}-byte geomerge RowId, got {} bytes", bytes.len() ))); } let mut hash = [0u8; 32]; hash.copy_from_slice(&bytes[..32]); let mut counter_buf = [0u8; 4]; counter_buf.copy_from_slice(&bytes[32..]); Ok(GmRowId { commit: CommitHash(hash), counter: u32::from_be_bytes(counter_buf), }) } fn encode_pending_row_id(counter: u32) -> RowId { RowId::new(counter.to_be_bytes()) } fn decode_pending_row_id(bytes: &[u8]) -> Result { if bytes.len() != PENDING_ROW_ID_LEN { return Err(validation(format!( "expected {PENDING_ROW_ID_LEN}-byte pending RowId, got {} bytes", bytes.len() ))); } let mut counter_buf = [0u8; 4]; counter_buf.copy_from_slice(bytes); Ok(TempRowId::from(u32::from_be_bytes(counter_buf))) } /// Geomerge-backed [`Storage`] implementation. pub struct GeomergeStorage { store: Store, declared: HashSet, } impl Default for GeomergeStorage { fn default() -> Self { Self::new() } } impl GeomergeStorage { /// Build an empty store. No relations are available until the store is /// rebuilt via a theory. #[must_use] pub fn new() -> Self { Self { store: Store::new(), declared: HashSet::new(), } } /// Build a store from a pre-defined `FlatTheory`. All `create_relation` /// calls must reference relations declared in the theory. /// /// # Errors /// Returns [`StorageError::Backend`] if geomerge rejects the theory. pub fn from_theory(theory: ir::FlatTheory) -> Result { let store = Store::try_from_theory(theory).map_err(|e| backend(*e))?; Ok(Self { store, declared: HashSet::new(), }) } /// Wrap an existing `Store`, e.g. after decoding via /// `geomerge::commit::pst::decode_store`. #[must_use] pub fn from_store(store: Store) -> Self { Self { store, declared: HashSet::new(), } } /// Borrow the underlying geomerge store (for backend-specific operations /// like persistence, dump, or law inspection that aren't on the trait). #[must_use] pub fn store(&self) -> &Store { &self.store } } impl Storage for GeomergeStorage { fn create_relation(&mut self, name: &str, arity: usize) -> Result<(), StorageError> { if self.declared.contains(name) { return Err(StorageError::RelationExists(name.to_string())); } let path: Path = name.into(); let table = self.store.table_at(&path).ok_or_else(|| { validation(format!( "relation '{name}' is not declared in the loaded geomerge theory; \ geomerge does not support runtime relation creation" )) })?; let declared_arity = table.schema().columns.len(); if declared_arity != arity { return Err(StorageError::ArityMismatch { expected: declared_arity, got: arity, }); } self.declared.insert(name.to_string()); Ok(()) } fn arity(&self, name: &str) -> Result { let path: Path = name.into(); let table = self .store .table_at(&path) .ok_or_else(|| StorageError::RelationNotFound(name.to_string()))?; Ok(table.schema().columns.len()) } fn scan_iter<'a>(&'a self, name: &str) -> Result, StorageError> { let path: Path = name.into(); let table = self .store .table_at(&path) .ok_or_else(|| StorageError::RelationNotFound(name.to_string()))?; let arity = table.schema().columns.len(); let mut rows = Vec::with_capacity(table.row_count()); for r in 0..table.row_count() { let gm_id = table .row_id_at(r) .ok_or_else(|| validation(format!("missing row id at {r} in '{name}'")))?; let id = encode_gm_row_id(&gm_id); let mut row = Vec::with_capacity(arity); for c in 0..arity { let cell = table .cell_at(r, c) .ok_or_else(|| validation(format!("missing cell at ({r}, {c}) in '{name}'")))?; row.push(cell_to_value(cell)); } rows.push((id, row)); } Ok(Box::new(rows.into_iter().map(Ok))) } fn transaction<'a>(&'a mut self) -> Result, StorageError> { let txn = self.store.transaction(); Ok(Box::new(GeomergeTx { txn: Some(txn), counter: 0, })) } } pub(crate) struct GeomergeTx<'a> { txn: Option>, /// Mirrors geomerge's internal `TempRowId` counter for this transaction. /// Both start at 0 and increment by 1 per `Transaction::add`. counter: u32, } impl Transaction for GeomergeTx<'_> { fn insert(&mut self, name: &str, row: Vec) -> Result { let path: Path = name.into(); let values: Vec = row .into_iter() .map(value_to_txn_cell) .collect::, _>>()?; let Some(txn) = self.txn.as_mut() else { unreachable!("transaction was already committed") }; txn.add(&path, values) .map_err(|e| validation(e.to_string()))?; let id = encode_pending_row_id(self.counter); self.counter += 1; Ok(id) } fn commit(self: Box) -> Result { let mut this = self; let Some(txn) = this.txn.take() else { unreachable!("transaction was already committed") }; // Law violations (totality, foreign-key, etc.) surface here. let commit_hash = txn.commit().map_err(|e| validation(e.to_string()))?; // Every counter we returned during this tx (0..self.counter) now // corresponds to a real RowId { commit: commit_hash, counter }. let mut resolutions = HashMap::with_capacity(this.counter as usize); for counter in 0..this.counter { let pending = encode_pending_row_id(counter); let real = encode_gm_row_id(&GmRowId { commit: commit_hash, counter, }); resolutions.insert(pending, real); } Ok(CommittedTx::from_mappings(resolutions)) } } fn cell_to_value(cell: &CellValue) -> Value { match cell { CellValue::Int(i) => Value::Int(*i), CellValue::Str(s) => Value::Str(s.clone()), CellValue::Id(id) => Value::Id(encode_gm_row_id(id)), } } fn value_to_txn_cell(value: Value) -> Result { match value { Value::Int(i) => Ok(TxnCellValue::Int(i)), Value::Str(s) => Ok(TxnCellValue::Str(s)), Value::Id(id) => { let bytes = id.as_bytes(); match bytes.len() { PENDING_ROW_ID_LEN => Ok(TxnCellValue::Id(RowRef::Pending(decode_pending_row_id( bytes, )?))), GM_ROW_ID_LEN => Ok(TxnCellValue::Id(RowRef::Existing(decode_gm_row_id(bytes)?))), len => Err(validation(format!( "geomerge RowId must be {PENDING_ROW_ID_LEN} (pending) or \ {GM_ROW_ID_LEN} (existing) bytes, got {len}" ))), } } } } #[cfg(test)] mod tests { use super::*; use geomerge::ir::{ColType, FlatTheory, PrimType, Schema, TableEntry}; fn i(x: i64) -> Value { Value::Int(x) } fn int_schema(arity: usize) -> Schema { Schema { columns: (0..arity) .map(|_| ColType::PrimType { prim: PrimType::PrimInt, }) .collect(), primary_key: None, } } fn theory_with_one_int_table(name: &str, arity: usize) -> FlatTheory { FlatTheory { tables: vec![TableEntry { path: name.into(), table: int_schema(arity), }], laws: Vec::new(), } } #[test] fn empty_store_has_no_relations() { let storage = GeomergeStorage::new(); assert!(matches!( storage.arity("edge"), Err(StorageError::RelationNotFound(_)) )); } #[test] fn create_relation_on_undeclared_returns_validation_error() { let mut storage = GeomergeStorage::new(); assert!(matches!( storage.create_relation("edge", 2), Err(StorageError::Validation(_)) )); } #[test] fn theory_loaded_insert_scan_roundtrip() -> Result<(), StorageError> { let theory = theory_with_one_int_table("edge", 2); let mut storage = GeomergeStorage::from_theory(theory)?; storage.create_relation("edge", 2)?; storage.insert("edge", vec![i(1), i(2)])?; storage.insert("edge", vec![i(3), i(4)])?; let rows = storage.scan("edge")?; assert_eq!(rows.len(), 2); assert_eq!(rows[0].1, vec![i(1), i(2)]); assert_eq!(rows[1].1, vec![i(3), i(4)]); // Scanned IDs use the 36-byte existing form. assert_eq!(rows[0].0.as_bytes().len(), GM_ROW_ID_LEN); assert_eq!(rows[1].0.as_bytes().len(), GM_ROW_ID_LEN); Ok(()) } #[test] fn single_storage_insert_returns_post_commit_row_id() -> Result<(), StorageError> { let theory = theory_with_one_int_table("edge", 1); let mut storage = GeomergeStorage::from_theory(theory)?; storage.create_relation("edge", 1)?; // `Storage::insert` opens its own tx, commits, and resolves the // pending RowId to its post-commit form. The returned id should be // the 36-byte (existing) shape, not the 4-byte (pending) shape. let id = storage.insert("edge", vec![i(1)])?; assert_eq!(id.as_bytes().len(), GM_ROW_ID_LEN); // And it should equal what scan returns. let rows = storage.scan("edge")?; assert_eq!(rows.len(), 1); assert_eq!(rows[0].0, id); Ok(()) } #[test] fn committed_tx_resolves_pending_ids() -> Result<(), StorageError> { let theory = theory_with_one_int_table("edge", 1); let mut storage = GeomergeStorage::from_theory(theory)?; storage.create_relation("edge", 1)?; let (committed, pending_a, pending_b) = { let mut tx = storage.transaction()?; let a = tx.insert("edge", vec![i(1)])?; let b = tx.insert("edge", vec![i(2)])?; (tx.commit()?, a, b) }; let real_a = committed.resolve(&pending_a); let real_b = committed.resolve(&pending_b); assert_eq!(real_a.as_bytes().len(), GM_ROW_ID_LEN); assert_eq!(real_b.as_bytes().len(), GM_ROW_ID_LEN); assert_ne!(real_a, real_b); // The resolved ids should match what scan reports. let rows = storage.scan("edge")?; assert!(rows.iter().any(|(id, _)| id == &real_a)); assert!(rows.iter().any(|(id, _)| id == &real_b)); Ok(()) } #[test] fn batched_inserts_in_one_transaction() -> Result<(), StorageError> { let theory = theory_with_one_int_table("edge", 2); let mut storage = GeomergeStorage::from_theory(theory)?; storage.create_relation("edge", 2)?; { let mut tx = storage.transaction()?; let p0 = tx.insert("edge", vec![i(1), i(2)])?; let p1 = tx.insert("edge", vec![i(3), i(4)])?; // Pending ids are 4-byte counters within the tx. assert_eq!(p0.as_bytes().len(), PENDING_ROW_ID_LEN); assert_eq!(p1.as_bytes().len(), PENDING_ROW_ID_LEN); tx.commit()?; } let rows = storage.scan("edge")?; assert_eq!(rows.len(), 2); Ok(()) } #[test] fn dropped_transaction_is_rolled_back() -> Result<(), StorageError> { let theory = theory_with_one_int_table("edge", 2); let mut storage = GeomergeStorage::from_theory(theory)?; storage.create_relation("edge", 2)?; { let mut tx = storage.transaction()?; tx.insert("edge", vec![i(1), i(2)])?; } assert!(storage.scan("edge")?.is_empty()); Ok(()) } #[test] fn duplicate_create_returns_err() -> Result<(), StorageError> { let theory = theory_with_one_int_table("edge", 2); let mut storage = GeomergeStorage::from_theory(theory)?; storage.create_relation("edge", 2)?; assert!(matches!( storage.create_relation("edge", 2), Err(StorageError::RelationExists(_)) )); Ok(()) } #[test] fn delete_is_not_supported() -> Result<(), StorageError> { let theory = theory_with_one_int_table("edge", 1); let mut storage = GeomergeStorage::from_theory(theory)?; storage.create_relation("edge", 1)?; let id = storage.insert("edge", vec![i(1)])?; let result = storage.delete("edge", &id); assert!(matches!(result, Err(StorageError::Unsupported(_)))); Ok(()) } #[test] fn insert_wrong_type_returns_validation_error() -> Result<(), StorageError> { let theory = theory_with_one_int_table("edge", 2); let mut storage = GeomergeStorage::from_theory(theory)?; storage.create_relation("edge", 2)?; let result = storage.insert("edge", vec![Value::Str("not an int".to_string()), i(2)]); assert!(matches!(result, Err(StorageError::Validation(_)))); Ok(()) } }