Compare commits

..

2 Commits

Author SHA1 Message Date
Hassan Abedi
b572161142 Use query-storage for storage access in other crates 2026-06-04 12:16:30 +02:00
Hassan Abedi
ed8c438135 Add a storage abstraction layer implmentation (query-storage) 2026-06-04 11:51:25 +02:00
19 changed files with 2681 additions and 13 deletions

1097
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -9,3 +9,4 @@ rust-version.workspace = true
workspace = true workspace = true
[dependencies] [dependencies]
query-storage = { path = "../query-storage" }

View File

@ -9,7 +9,10 @@
use std::collections::HashMap; use std::collections::HashMap;
use crate::{relation::Relation, table::Table, value::Value}; use query_storage::table::Table;
use query_storage::value::Value;
use crate::relation::Relation;
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
pub enum Term { pub enum Term {

View File

@ -11,7 +11,9 @@
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use crate::{relation::Relation, value::Value}; use query_storage::value::Value;
use crate::relation::Relation;
fn shared_columns(left: &Relation, right: &Relation) -> Vec<(usize, usize)> { fn shared_columns(left: &Relation, right: &Relation) -> Vec<(usize, usize)> {
left.columns left.columns

View File

@ -2,8 +2,8 @@
//! //!
//! Three operators are in scope: //! Three operators are in scope:
//! //!
//! - [`atom::scan_atom`] scans a [`table::Table`] under an //! - [`atom::scan_atom`] scans a [`Table`](query_storage::table::Table) under
//! [`atom::AtomPattern`], filtering for repeated-variable equality and //! an [`atom::AtomPattern`], filtering for repeated-variable equality and
//! literal equality, and outputs a binding [`relation::Relation`]. //! literal equality, and outputs a binding [`relation::Relation`].
//! - [`join::semijoin`] keeps rows of one relation whose shared-column values //! - [`join::semijoin`] keeps rows of one relation whose shared-column values
//! appear in another. //! appear in another.
@ -14,10 +14,11 @@
//! is just an expression like //! is just an expression like
//! `natural_join(&semijoin(&a, &b), &scan_atom(&t, &p))`. //! `natural_join(&semijoin(&a, &b), &scan_atom(&t, &p))`.
//! //!
//! Integration with an external query-plan IR is out of scope. //! Foundational types [`Value`](query_storage::value::Value) and
//! [`Table`](query_storage::table::Table) live in `query-storage`, the
//! storage-layer crate this crate is built on; storage backends produce
//! `Table`s that operators here consume.
pub mod atom; pub mod atom;
pub mod join; pub mod join;
pub mod relation; pub mod relation;
pub mod table;
pub mod value;

View File

@ -10,7 +10,7 @@
use std::collections::HashSet; use std::collections::HashSet;
use crate::value::Value; use query_storage::value::Value;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct Relation { pub struct Relation {

View File

@ -15,8 +15,8 @@
use query_ops::atom::{AtomPattern, Term, scan_atom}; use query_ops::atom::{AtomPattern, Term, scan_atom};
use query_ops::join::{natural_join, semijoin}; use query_ops::join::{natural_join, semijoin};
use query_ops::table::Table; use query_storage::table::Table;
use query_ops::value::Value; use query_storage::value::Value;
fn s(x: &str) -> Value { fn s(x: &str) -> Value {
Value::Str(x.to_string()) Value::Str(x.to_string())

View File

@ -0,0 +1,40 @@
//! End-to-end: load rows into [`MemoryStorage`], scan as a [`Table`],
//! run [`scan_atom`] against it.
//!
//! Demonstrates that `query-ops` operators can consume from a storage backend
//! through the [`scan_as_table`] bridge, with no changes to `query-ops` itself.
use query_ops::atom::{AtomPattern, Term, scan_atom};
use query_storage::table::Table;
use query_storage::value::Value;
use query_storage::{MemoryStorage, Storage, StorageError, scan_as_table};
fn i(x: i64) -> Value {
Value::Int(x)
}
fn var(name: &str) -> Term {
Term::Var(name.to_string())
}
#[test]
fn scan_atom_consumes_from_memory_backend() -> Result<(), StorageError> {
let mut storage = MemoryStorage::new();
storage.create_relation("edge", 2)?;
storage.insert("edge", vec![i(1), i(2)])?;
storage.insert("edge", vec![i(2), i(2)])?;
storage.insert("edge", vec![i(3), i(3)])?;
storage.insert("edge", vec![i(4), i(5)])?;
let edge_table: Table = scan_as_table(&storage, "edge")?;
let self_loops = scan_atom(
&edge_table,
&AtomPattern {
columns: vec![var("X"), var("X")],
},
);
assert_eq!(self_loops.columns, vec!["X".to_string()]);
assert_eq!(self_loops.rows, vec![vec![i(2)], vec![i(3)]]);
Ok(())
}

View File

@ -0,0 +1,30 @@
[package]
name = "query-storage"
version = "0.1.0"
edition.workspace = true
license.workspace = true
rust-version.workspace = true
[lints.rust]
unsafe_code = "deny"
[lints.clippy]
pedantic = "warn"
[features]
default = []
lmdb = ["dep:heed"]
redb = ["dep:redb"]
fjall = ["dep:fjall"]
sled = ["dep:sled"]
geomerge = ["dep:geomerge"]
[dependencies]
heed = { version = "0.20", optional = true }
redb = { version = "2", optional = true }
fjall = { version = "2", optional = true }
sled = { version = "0.34", optional = true }
geomerge = { path = "../../external/geomerge/crates/geomerge", optional = true }
[dev-dependencies]
tempfile = "3"

View File

@ -0,0 +1,262 @@
//! Wire format shared by every byte-oriented backend in this crate.
//!
//! The encoding is hand-rolled (no `serde`, no `bincode`) so that the
//! generated bytes are stable and inspectable. It is **not** versioned: adding
//! a new [`Value`] variant invalidates previously-stored data. That is fine
//! for a playground; production code would prepend a format byte.
//!
//! ## Row Format
//!
//! `[count: u32 LE] [val × count]`
//!
//! ## Value Format
//!
//! `[tag: u8] [payload]`
//!
//! | Tag | Variant | Payload |
//! |--------|---------------|--------------------------------------|
//! | `0x00` | `Value::Int` | `i64 LE` (8 bytes) |
//! | `0x01` | `Value::Str` | `[len: u32 LE] [bytes]` |
//!
//! ## Row Key Format
//!
//! Synthetic row IDs are `u64` encoded big-endian so lexicographic key order
//! matches insertion order. Backends with named sub-stores per relation can
//! use this directly as the key.
//!
//! ## Metadata Format
//!
//! Per-relation metadata is `[arity: u32 LE] [next_id: u64 LE]` = 12 bytes.
use crate::value::Value;
/// Errors raised by [`decode_row`] and [`decode_meta`].
#[derive(Debug)]
pub enum CodecError {
/// The byte slice ended before the expected number of fields was read.
UnexpectedEof,
/// A value tag byte was unrecognized.
UnknownTag(u8),
/// A length field declared more bytes than the slice contains.
LengthOverrun { declared: usize, available: usize },
/// A UTF-8 string payload could not be decoded.
InvalidUtf8,
}
impl std::fmt::Display for CodecError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::UnexpectedEof => write!(f, "unexpected end of bytes"),
Self::UnknownTag(t) => write!(f, "unknown value tag: 0x{t:02x}"),
Self::LengthOverrun {
declared,
available,
} => write!(
f,
"declared length {declared} exceeds available {available} bytes"
),
Self::InvalidUtf8 => write!(f, "invalid UTF-8 in string payload"),
}
}
}
impl std::error::Error for CodecError {}
/// Encode a row of [`Value`]s to bytes.
#[must_use]
pub fn encode_row(row: &[Value]) -> Vec<u8> {
let mut out = Vec::with_capacity(4 + row.len() * 9);
out.extend_from_slice(&u32::try_from(row.len()).unwrap_or(u32::MAX).to_le_bytes());
for value in row {
match value {
Value::Int(i) => {
out.push(0x00);
out.extend_from_slice(&i.to_le_bytes());
}
Value::Str(s) => {
out.push(0x01);
let bytes = s.as_bytes();
out.extend_from_slice(
&u32::try_from(bytes.len()).unwrap_or(u32::MAX).to_le_bytes(),
);
out.extend_from_slice(bytes);
}
}
}
out
}
/// Decode a row of [`Value`]s from bytes.
///
/// # Errors
/// Returns [`CodecError`] if the byte slice is malformed.
pub fn decode_row(mut bytes: &[u8]) -> Result<Vec<Value>, CodecError> {
let count = read_u32(&mut bytes)? as usize;
let mut row = Vec::with_capacity(count);
for _ in 0..count {
row.push(read_value(&mut bytes)?);
}
Ok(row)
}
fn read_value(bytes: &mut &[u8]) -> Result<Value, CodecError> {
let tag = read_u8(bytes)?;
match tag {
0x00 => {
let i = read_i64(bytes)?;
Ok(Value::Int(i))
}
0x01 => {
let len = read_u32(bytes)? as usize;
if bytes.len() < len {
return Err(CodecError::LengthOverrun {
declared: len,
available: bytes.len(),
});
}
let (head, tail) = bytes.split_at(len);
*bytes = tail;
let s = std::str::from_utf8(head)
.map_err(|_| CodecError::InvalidUtf8)?
.to_string();
Ok(Value::Str(s))
}
other => Err(CodecError::UnknownTag(other)),
}
}
fn read_u8(bytes: &mut &[u8]) -> Result<u8, CodecError> {
let (head, tail) = bytes.split_first().ok_or(CodecError::UnexpectedEof)?;
*bytes = tail;
Ok(*head)
}
fn read_u32(bytes: &mut &[u8]) -> Result<u32, CodecError> {
if bytes.len() < 4 {
return Err(CodecError::UnexpectedEof);
}
let (head, tail) = bytes.split_at(4);
*bytes = tail;
let mut buf = [0u8; 4];
buf.copy_from_slice(head);
Ok(u32::from_le_bytes(buf))
}
fn read_u64(bytes: &mut &[u8]) -> Result<u64, CodecError> {
if bytes.len() < 8 {
return Err(CodecError::UnexpectedEof);
}
let (head, tail) = bytes.split_at(8);
*bytes = tail;
let mut buf = [0u8; 8];
buf.copy_from_slice(head);
Ok(u64::from_le_bytes(buf))
}
fn read_i64(bytes: &mut &[u8]) -> Result<i64, CodecError> {
if bytes.len() < 8 {
return Err(CodecError::UnexpectedEof);
}
let (head, tail) = bytes.split_at(8);
*bytes = tail;
let mut buf = [0u8; 8];
buf.copy_from_slice(head);
Ok(i64::from_le_bytes(buf))
}
/// Encode a row key from a synthetic u64 ID.
///
/// Big-endian so lexicographic key order matches insertion order.
#[must_use]
pub fn row_key(id: u64) -> [u8; 8] {
id.to_be_bytes()
}
/// Encode per-relation metadata: arity and next row ID.
#[must_use]
pub fn encode_meta(arity: u32, next_id: u64) -> [u8; 12] {
let mut out = [0u8; 12];
out[0..4].copy_from_slice(&arity.to_le_bytes());
out[4..12].copy_from_slice(&next_id.to_le_bytes());
out
}
/// Decode per-relation metadata.
///
/// # Errors
/// Returns [`CodecError::UnexpectedEof`] if the slice is shorter than 12 bytes.
pub fn decode_meta(mut bytes: &[u8]) -> Result<(u32, u64), CodecError> {
let arity = read_u32(&mut bytes)?;
let next_id = read_u64(&mut bytes)?;
Ok((arity, next_id))
}
#[cfg(test)]
mod tests {
use super::*;
fn i(x: i64) -> Value {
Value::Int(x)
}
fn s(x: &str) -> Value {
Value::Str(x.to_string())
}
#[test]
fn encode_decode_int_only_row() -> Result<(), CodecError> {
let row = vec![i(1), i(-2), i(i64::MAX)];
let bytes = encode_row(&row);
let decoded = decode_row(&bytes)?;
assert_eq!(decoded, row);
Ok(())
}
#[test]
fn encode_decode_mixed_row() -> Result<(), CodecError> {
let row = vec![s("Alice"), i(42), s("a longer string with spaces")];
let bytes = encode_row(&row);
let decoded = decode_row(&bytes)?;
assert_eq!(decoded, row);
Ok(())
}
#[test]
fn encode_decode_empty_row() -> Result<(), CodecError> {
let bytes = encode_row(&[]);
let decoded = decode_row(&bytes)?;
assert!(decoded.is_empty());
Ok(())
}
#[test]
fn decode_unknown_tag_fails() {
let bytes = vec![1, 0, 0, 0, 0xFF];
assert!(matches!(
decode_row(&bytes),
Err(CodecError::UnknownTag(0xFF))
));
}
#[test]
fn decode_truncated_fails() {
let bytes = vec![1, 0, 0, 0, 0x00, 0x01];
assert!(matches!(decode_row(&bytes), Err(CodecError::UnexpectedEof)));
}
#[test]
fn row_key_preserves_order() {
assert!(row_key(1) < row_key(2));
assert!(row_key(255) < row_key(256));
assert!(row_key(u64::MAX - 1) < row_key(u64::MAX));
}
#[test]
fn meta_roundtrip() -> Result<(), CodecError> {
let encoded = encode_meta(3, 12345);
let (arity, next_id) = decode_meta(&encoded)?;
assert_eq!(arity, 3);
assert_eq!(next_id, 12345);
Ok(())
}
}

View File

@ -0,0 +1,149 @@
//! fjall adapter.
//!
//! Each relation gets a fjall [`PartitionHandle`](fjall::PartitionHandle) of
//! the same name. A reserved partition named `__meta` carries per-relation
//! metadata (arity and next synthetic row ID).
use fjall::{Keyspace, PartitionCreateOptions, PartitionHandle};
use crate::value::Value;
use crate::codec::{decode_meta, decode_row, encode_meta, encode_row, row_key};
use crate::{Storage, StorageError};
const META_PARTITION: &str = "__meta";
fn backend<E: std::error::Error + Send + Sync + 'static>(err: E) -> StorageError {
StorageError::Backend(Box::new(err))
}
/// fjall-backed [`Storage`] implementation.
pub struct FjallStorage {
keyspace: Keyspace,
meta: PartitionHandle,
}
impl FjallStorage {
/// Open or create a fjall keyspace at `path`.
///
/// # Errors
/// Returns [`StorageError::Backend`] if fjall fails to open the path.
pub fn open(path: impl AsRef<std::path::Path>) -> Result<Self, StorageError> {
let keyspace = fjall::Config::new(path).open().map_err(backend)?;
let meta = keyspace
.open_partition(META_PARTITION, PartitionCreateOptions::default())
.map_err(backend)?;
Ok(Self { keyspace, meta })
}
fn relation_partition(&self, name: &str) -> Result<PartitionHandle, StorageError> {
self.keyspace
.open_partition(name, PartitionCreateOptions::default())
.map_err(backend)
}
fn load_meta(&self, name: &str) -> Result<(u32, u64), StorageError> {
let raw = self
.meta
.get(name.as_bytes())
.map_err(backend)?
.ok_or_else(|| StorageError::RelationNotFound(name.to_string()))?;
Ok(decode_meta(raw.as_ref())?)
}
fn store_meta(&self, name: &str, arity: u32, next_id: u64) -> Result<(), StorageError> {
self.meta
.insert(name.as_bytes(), encode_meta(arity, next_id))
.map_err(backend)?;
Ok(())
}
}
impl Storage for FjallStorage {
fn create_relation(&mut self, name: &str, arity: usize) -> Result<(), StorageError> {
if name == META_PARTITION {
return Err(StorageError::Validation(format!(
"relation name '{name}' is reserved"
)));
}
if self.meta.contains_key(name.as_bytes()).map_err(backend)? {
return Err(StorageError::RelationExists(name.to_string()));
}
let arity_u32 = u32::try_from(arity)
.map_err(|_| StorageError::Validation(format!("arity {arity} exceeds u32 range")))?;
self.store_meta(name, arity_u32, 0)?;
let _ = self.relation_partition(name)?;
Ok(())
}
fn arity(&self, name: &str) -> Result<usize, StorageError> {
let (arity, _) = self.load_meta(name)?;
Ok(arity as usize)
}
fn scan(&self, name: &str) -> Result<Vec<Vec<Value>>, StorageError> {
let _ = self.load_meta(name)?;
let partition = self.relation_partition(name)?;
let mut rows = Vec::new();
for entry in partition.iter() {
let (_, value) = entry.map_err(backend)?;
rows.push(decode_row(value.as_ref())?);
}
Ok(rows)
}
fn insert(&mut self, name: &str, row: Vec<Value>) -> Result<(), StorageError> {
let (arity, next_id) = self.load_meta(name)?;
if row.len() != arity as usize {
return Err(StorageError::ArityMismatch {
expected: arity as usize,
got: row.len(),
});
}
let partition = self.relation_partition(name)?;
partition
.insert(row_key(next_id), encode_row(&row))
.map_err(backend)?;
self.store_meta(name, arity, next_id + 1)?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
fn i(x: i64) -> Value {
Value::Int(x)
}
fn open_temp() -> Result<FjallStorage, StorageError> {
let dir = tempfile::tempdir().map_err(backend)?;
let storage = FjallStorage::open(dir.path())?;
std::mem::forget(dir);
Ok(storage)
}
#[test]
fn create_insert_scan_roundtrip() -> Result<(), StorageError> {
let mut storage = open_temp()?;
storage.create_relation("edge", 2)?;
storage.insert("edge", vec![i(1), i(2)])?;
storage.insert("edge", vec![i(2), i(3)])?;
let rows = storage.scan("edge")?;
assert_eq!(rows, vec![vec![i(1), i(2)], vec![i(2), i(3)]]);
assert_eq!(storage.arity("edge")?, 2);
Ok(())
}
#[test]
fn duplicate_create_returns_err() -> Result<(), StorageError> {
let mut storage = open_temp()?;
storage.create_relation("edge", 2)?;
assert!(matches!(
storage.create_relation("edge", 2),
Err(StorageError::RelationExists(_))
));
Ok(())
}
}

View File

@ -0,0 +1,252 @@
//! Geomerge adapter.
//!
//! Unlike the other backends, geomerge schemas are **immutable after store
//! construction**: there is no public API to register a new table on a live
//! `Store`. The adapter therefore expects all relations to be declared up
//! front via a `FlatTheory` passed to [`GeomergeStorage::from_theory`], and
//! [`Storage::create_relation`] becomes a verifier that the relation exists
//! in the loaded theory and that its arity matches.
//!
//! Additional v1 mismatches with the trait:
//!
//! - Column types are typed (`PrimInt` / `PrimString`) in geomerge but the
//! trait's `create_relation` only carries `arity`. The adapter cannot
//! declare a relation at runtime, so this issue surfaces only at insert
//! time when geomerge rejects a row with `StorageError::Validation`.
//! - Cells of type `CellValue::Id` cannot be represented in our `Value` enum.
//! Scanning a table that contains such cells returns `StorageError::Validation`.
//! - Every `insert` opens a fresh `Transaction` and commits. Law violations
//! surface at commit time, not at the `add` call.
use std::collections::HashSet;
use geomerge::ir::{self, Path};
use geomerge::store::Store;
use geomerge::table::CellValue;
use geomerge::txn::ops::TxnCellValue;
use crate::value::Value;
use crate::{Storage, StorageError};
fn backend<E: std::error::Error + Send + Sync + 'static>(err: E) -> StorageError {
StorageError::Backend(Box::new(err))
}
fn validation(msg: impl Into<String>) -> StorageError {
StorageError::Validation(msg.into())
}
/// Geomerge-backed [`Storage`] implementation.
///
/// Construct via [`GeomergeStorage::new`] (empty store, no relations) or
/// [`GeomergeStorage::from_theory`] (preloaded with a `FlatTheory`).
pub struct GeomergeStorage {
store: Store,
declared: HashSet<String>,
}
impl Default for GeomergeStorage {
fn default() -> Self {
Self::new()
}
}
impl GeomergeStorage {
/// Build an empty store. No relations are available until the store is
/// rebuilt via a theory.
#[must_use]
pub fn new() -> Self {
Self {
store: Store::new(),
declared: HashSet::new(),
}
}
/// Build a store from a pre-defined `FlatTheory`. All `create_relation`
/// calls must reference relations declared in the theory.
///
/// # Errors
/// Returns [`StorageError::Backend`] if geomerge rejects the theory.
pub fn from_theory(theory: ir::FlatTheory) -> Result<Self, StorageError> {
let store = Store::try_from_theory(theory).map_err(|e| backend(*e))?;
Ok(Self {
store,
declared: HashSet::new(),
})
}
}
impl Storage for GeomergeStorage {
fn create_relation(&mut self, name: &str, arity: usize) -> Result<(), StorageError> {
if self.declared.contains(name) {
return Err(StorageError::RelationExists(name.to_string()));
}
let path: Path = name.into();
let table = self.store.table_at(&path).ok_or_else(|| {
validation(format!(
"relation '{name}' is not declared in the loaded geomerge theory; \
geomerge does not support runtime relation creation"
))
})?;
let declared_arity = table.schema().columns.len();
if declared_arity != arity {
return Err(StorageError::ArityMismatch {
expected: declared_arity,
got: arity,
});
}
self.declared.insert(name.to_string());
Ok(())
}
fn arity(&self, name: &str) -> Result<usize, StorageError> {
let path: Path = name.into();
let table = self
.store
.table_at(&path)
.ok_or_else(|| StorageError::RelationNotFound(name.to_string()))?;
Ok(table.schema().columns.len())
}
fn scan(&self, name: &str) -> Result<Vec<Vec<Value>>, StorageError> {
let path: Path = name.into();
let table = self
.store
.table_at(&path)
.ok_or_else(|| StorageError::RelationNotFound(name.to_string()))?;
let arity = table.schema().columns.len();
let mut rows = Vec::with_capacity(table.row_count());
for r in 0..table.row_count() {
let mut row = Vec::with_capacity(arity);
for c in 0..arity {
let cell = table
.cell_at(r, c)
.ok_or_else(|| validation(format!("missing cell at ({r}, {c}) in '{name}'")))?;
row.push(cell_to_value(cell)?);
}
rows.push(row);
}
Ok(rows)
}
fn insert(&mut self, name: &str, row: Vec<Value>) -> Result<(), StorageError> {
let path: Path = name.into();
let arity = self.arity(name)?;
if row.len() != arity {
return Err(StorageError::ArityMismatch {
expected: arity,
got: row.len(),
});
}
let values: Vec<TxnCellValue> = row.into_iter().map(value_to_txn_cell).collect();
let mut txn = self.store.transaction();
txn.add(&path, values)
.map_err(|e| validation(e.to_string()))?;
// Law violations surface here at commit time, not at add time.
txn.commit().map_err(|e| validation(e.to_string()))?;
Ok(())
}
}
fn cell_to_value(cell: &CellValue) -> Result<Value, StorageError> {
match cell {
CellValue::Int(i) => Ok(Value::Int(*i)),
CellValue::Str(s) => Ok(Value::Str(s.clone())),
CellValue::Id(_) => Err(validation(
"geomerge CellValue::Id cannot be represented in the playground's Value enum",
)),
}
}
fn value_to_txn_cell(value: Value) -> TxnCellValue {
match value {
Value::Int(i) => TxnCellValue::Int(i),
Value::Str(s) => TxnCellValue::Str(s),
}
}
#[cfg(test)]
mod tests {
use super::*;
use geomerge::ir::{ColType, FlatTheory, PrimType, Schema, TableEntry};
fn i(x: i64) -> Value {
Value::Int(x)
}
fn int_schema(arity: usize) -> Schema {
Schema {
columns: (0..arity)
.map(|_| ColType::PrimType {
prim: PrimType::PrimInt,
})
.collect(),
primary_key: None,
}
}
fn theory_with_one_int_table(name: &str, arity: usize) -> FlatTheory {
FlatTheory {
tables: vec![TableEntry {
path: name.into(),
table: int_schema(arity),
}],
laws: Vec::new(),
}
}
#[test]
fn empty_store_has_no_relations() {
let storage = GeomergeStorage::new();
assert!(matches!(
storage.arity("edge"),
Err(StorageError::RelationNotFound(_))
));
}
#[test]
fn create_relation_on_undeclared_returns_validation_error() {
let mut storage = GeomergeStorage::new();
assert!(matches!(
storage.create_relation("edge", 2),
Err(StorageError::Validation(_))
));
}
#[test]
fn theory_loaded_insert_scan_roundtrip() -> Result<(), StorageError> {
let theory = theory_with_one_int_table("edge", 2);
let mut storage = GeomergeStorage::from_theory(theory)?;
storage.create_relation("edge", 2)?;
storage.insert("edge", vec![i(1), i(2)])?;
storage.insert("edge", vec![i(3), i(4)])?;
let rows = storage.scan("edge")?;
assert_eq!(rows, vec![vec![i(1), i(2)], vec![i(3), i(4)]]);
assert_eq!(storage.arity("edge")?, 2);
Ok(())
}
#[test]
fn duplicate_create_returns_err() -> Result<(), StorageError> {
let theory = theory_with_one_int_table("edge", 2);
let mut storage = GeomergeStorage::from_theory(theory)?;
storage.create_relation("edge", 2)?;
assert!(matches!(
storage.create_relation("edge", 2),
Err(StorageError::RelationExists(_))
));
Ok(())
}
#[test]
fn insert_wrong_type_returns_validation_error() -> Result<(), StorageError> {
let theory = theory_with_one_int_table("edge", 2);
let mut storage = GeomergeStorage::from_theory(theory)?;
storage.create_relation("edge", 2)?;
// Insert a Str into an Int column: geomerge rejects it.
let result = storage.insert("edge", vec![Value::Str("not an int".to_string()), i(2)]);
assert!(matches!(result, Err(StorageError::Validation(_))));
Ok(())
}
}

View File

@ -0,0 +1,145 @@
//! Storage layer for the query-plan playground.
//!
//! This is the foundational crate of the workspace. It owns the [`Value`] cell
//! type and the [`Table`] container, defines the [`Storage`] trait, and ships
//! adapters for several backends behind Cargo features. Higher-level crates
//! such as `query-ops` depend on this crate for both the types and the trait.
//!
//! The v1 trait surface is deliberately narrow: create a relation, scan all
//! rows, insert a row, ask for arity. Transactions, range scans, deletes, and
//! delta streams are not modeled yet, and will be added when a specific
//! experiment demands them.
//!
//! ## Backends
//!
//! [`MemoryStorage`] is always available. Other backends are gated behind
//! Cargo features so users only pay for what they need:
//!
//! - `lmdb` — LMDB via the `heed` crate
//! - `redb` — pure-Rust embedded KV
//! - `fjall` — pure-Rust LSM-tree
//! - `sled` — pure-Rust LSM-tree
//! - `geomerge` — the workspace's `geomerge` crate
use crate::table::Table;
use crate::value::Value;
pub mod codec;
pub mod memory;
pub mod table;
pub mod value;
#[cfg(feature = "sled")]
pub mod sled;
#[cfg(feature = "redb")]
pub mod redb;
#[cfg(feature = "fjall")]
pub mod fjall;
#[cfg(feature = "lmdb")]
pub mod lmdb;
#[cfg(feature = "geomerge")]
pub mod geomerge;
pub use memory::MemoryStorage;
/// Errors returned by a [`Storage`] backend.
///
/// Backend-specific failures (LMDB transaction aborts, sled I/O errors, etc.)
/// are wrapped in [`StorageError::Backend`].
#[derive(Debug)]
pub enum StorageError {
/// No relation with the given name exists in this backend.
RelationNotFound(String),
/// A relation with the given name already exists.
RelationExists(String),
/// A row was offered with the wrong number of columns.
ArityMismatch { expected: usize, got: usize },
/// A backend-defined validation rule rejected the operation, for example
/// a `geomerge` law violation.
Validation(String),
/// A row decoded from storage was malformed.
Decode(codec::CodecError),
/// A backend-specific error wrapped for transport across the trait.
Backend(Box<dyn std::error::Error + Send + Sync>),
}
impl std::fmt::Display for StorageError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::RelationNotFound(name) => write!(f, "relation not found: {name}"),
Self::RelationExists(name) => write!(f, "relation already exists: {name}"),
Self::ArityMismatch { expected, got } => {
write!(f, "arity mismatch: expected {expected}, got {got}")
}
Self::Validation(msg) => write!(f, "validation failed: {msg}"),
Self::Decode(err) => write!(f, "decode error: {err}"),
Self::Backend(err) => write!(f, "backend error: {err}"),
}
}
}
impl std::error::Error for StorageError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::Backend(err) => Some(err.as_ref()),
Self::Decode(err) => Some(err),
_ => None,
}
}
}
impl From<codec::CodecError> for StorageError {
fn from(err: codec::CodecError) -> Self {
Self::Decode(err)
}
}
/// Backend-agnostic interface for storing and retrieving rows.
///
/// Each relation has a fixed name, a fixed arity (row width), and an ordered
/// collection of rows whose cells are [`Value`]s. Concrete implementations
/// include [`MemoryStorage`] in this crate plus the feature-gated backends.
pub trait Storage {
/// Create a new relation with the given name and arity.
///
/// # Errors
/// Returns [`StorageError::RelationExists`] if a relation with the given
/// name already exists.
fn create_relation(&mut self, name: &str, arity: usize) -> Result<(), StorageError>;
/// Return the arity of the given relation.
///
/// # Errors
/// Returns [`StorageError::RelationNotFound`] if no such relation exists.
fn arity(&self, name: &str) -> Result<usize, StorageError>;
/// Scan all rows of the given relation in storage order.
///
/// # Errors
/// Returns [`StorageError::RelationNotFound`] if no such relation exists.
fn scan(&self, name: &str) -> Result<Vec<Vec<Value>>, StorageError>;
/// Append a row to the given relation.
///
/// # Errors
/// Returns [`StorageError::RelationNotFound`] if no such relation exists,
/// [`StorageError::ArityMismatch`] if the row's length differs from the
/// declared arity, or [`StorageError::Validation`] / [`StorageError::Backend`]
/// if a backend-specific rule rejects the row.
fn insert(&mut self, name: &str, row: Vec<Value>) -> Result<(), StorageError>;
}
/// Materialize a relation from a [`Storage`] backend as a [`Table`] that
/// query-language operators can consume.
///
/// # Errors
/// Returns any error produced by [`Storage::arity`] or [`Storage::scan`].
pub fn scan_as_table(storage: &dyn Storage, name: &str) -> Result<Table, StorageError> {
let arity = storage.arity(name)?;
let rows = storage.scan(name)?;
Ok(Table::from_rows(arity, rows))
}

View File

@ -0,0 +1,201 @@
//! LMDB adapter via the `heed` crate.
//!
//! Maps each relation onto a named LMDB sub-database of the same name. A
//! reserved sub-database named `__meta` carries per-relation metadata (arity
//! and next synthetic row ID).
//!
//! Note: every [`Storage::insert`] opens its own write transaction. LMDB
//! serializes writers across the env, so per-row inserts will be slow on real
//! workloads. The v1 trait does not yet expose batch inserts.
use heed::types::Bytes;
use heed::{Database, Env, EnvOpenOptions};
use crate::value::Value;
use crate::codec::{decode_meta, decode_row, encode_meta, encode_row, row_key};
use crate::{Storage, StorageError};
const META_DB: &str = "__meta";
const DEFAULT_MAX_DBS: u32 = 128;
const DEFAULT_MAP_SIZE: usize = 100 * 1024 * 1024;
fn backend<E: std::error::Error + Send + Sync + 'static>(err: E) -> StorageError {
StorageError::Backend(Box::new(err))
}
/// LMDB-backed [`Storage`] implementation.
pub struct LmdbStorage {
env: Env,
meta: Database<Bytes, Bytes>,
}
impl LmdbStorage {
/// Open or create an LMDB environment at `path`.
///
/// The path must already exist as a directory; LMDB will create its data
/// files inside it.
///
/// # Errors
/// Returns [`StorageError::Backend`] if LMDB fails to open.
///
/// # Safety
/// Internally uses `EnvOpenOptions::open`, which heed marks `unsafe`
/// because the memory-mapped file's contents can be modified by other
/// processes. The adapter assumes single-process exclusive access.
#[allow(unsafe_code)]
pub fn open(path: impl AsRef<std::path::Path>) -> Result<Self, StorageError> {
// SAFETY: heed marks `open` unsafe because the mmap'd file's contents
// can be modified by other processes, violating Rust's aliasing rules.
// This adapter assumes single-process exclusive access to the path,
// which holds for tests and typical playground use.
let env = unsafe {
EnvOpenOptions::new()
.max_dbs(DEFAULT_MAX_DBS)
.map_size(DEFAULT_MAP_SIZE)
.open(path)
.map_err(backend)?
};
let mut wtxn = env.write_txn().map_err(backend)?;
let meta: Database<Bytes, Bytes> = env
.create_database(&mut wtxn, Some(META_DB))
.map_err(backend)?;
wtxn.commit().map_err(backend)?;
Ok(Self { env, meta })
}
fn open_relation_db(
&self,
wtxn: &mut heed::RwTxn,
name: &str,
) -> Result<Database<Bytes, Bytes>, StorageError> {
self.env.create_database(wtxn, Some(name)).map_err(backend)
}
}
impl Storage for LmdbStorage {
fn create_relation(&mut self, name: &str, arity: usize) -> Result<(), StorageError> {
if name == META_DB {
return Err(StorageError::Validation(format!(
"relation name '{name}' is reserved"
)));
}
let arity_u32 = u32::try_from(arity)
.map_err(|_| StorageError::Validation(format!("arity {arity} exceeds u32 range")))?;
let mut wtxn = self.env.write_txn().map_err(backend)?;
if self
.meta
.get(&wtxn, name.as_bytes())
.map_err(backend)?
.is_some()
{
return Err(StorageError::RelationExists(name.to_string()));
}
let encoded = encode_meta(arity_u32, 0);
self.meta
.put(&mut wtxn, name.as_bytes(), &encoded[..])
.map_err(backend)?;
let _ = self.open_relation_db(&mut wtxn, name)?;
wtxn.commit().map_err(backend)?;
Ok(())
}
fn arity(&self, name: &str) -> Result<usize, StorageError> {
let rtxn = self.env.read_txn().map_err(backend)?;
let raw = self
.meta
.get(&rtxn, name.as_bytes())
.map_err(backend)?
.ok_or_else(|| StorageError::RelationNotFound(name.to_string()))?;
let (arity, _) = decode_meta(raw)?;
Ok(arity as usize)
}
fn scan(&self, name: &str) -> Result<Vec<Vec<Value>>, StorageError> {
let rtxn = self.env.read_txn().map_err(backend)?;
if self
.meta
.get(&rtxn, name.as_bytes())
.map_err(backend)?
.is_none()
{
return Err(StorageError::RelationNotFound(name.to_string()));
}
let db: Database<Bytes, Bytes> = self
.env
.open_database(&rtxn, Some(name))
.map_err(backend)?
.ok_or_else(|| StorageError::RelationNotFound(name.to_string()))?;
let mut rows = Vec::new();
for entry in db.iter(&rtxn).map_err(backend)? {
let (_, value) = entry.map_err(backend)?;
rows.push(decode_row(value)?);
}
Ok(rows)
}
fn insert(&mut self, name: &str, row: Vec<Value>) -> Result<(), StorageError> {
let mut wtxn = self.env.write_txn().map_err(backend)?;
let meta_bytes = self
.meta
.get(&wtxn, name.as_bytes())
.map_err(backend)?
.ok_or_else(|| StorageError::RelationNotFound(name.to_string()))?;
let (arity, next_id) = decode_meta(meta_bytes)?;
if row.len() != arity as usize {
return Err(StorageError::ArityMismatch {
expected: arity as usize,
got: row.len(),
});
}
let db = self.open_relation_db(&mut wtxn, name)?;
let key = row_key(next_id);
let value = encode_row(&row);
db.put(&mut wtxn, &key[..], &value[..]).map_err(backend)?;
let new_meta = encode_meta(arity, next_id + 1);
self.meta
.put(&mut wtxn, name.as_bytes(), &new_meta[..])
.map_err(backend)?;
wtxn.commit().map_err(backend)?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
fn i(x: i64) -> Value {
Value::Int(x)
}
fn open_temp() -> Result<LmdbStorage, StorageError> {
let dir = tempfile::tempdir().map_err(backend)?;
let storage = LmdbStorage::open(dir.path())?;
std::mem::forget(dir);
Ok(storage)
}
#[test]
fn create_insert_scan_roundtrip() -> Result<(), StorageError> {
let mut storage = open_temp()?;
storage.create_relation("edge", 2)?;
storage.insert("edge", vec![i(1), i(2)])?;
storage.insert("edge", vec![i(2), i(3)])?;
let rows = storage.scan("edge")?;
assert_eq!(rows, vec![vec![i(1), i(2)], vec![i(2), i(3)]]);
assert_eq!(storage.arity("edge")?, 2);
Ok(())
}
#[test]
fn duplicate_create_returns_err() -> Result<(), StorageError> {
let mut storage = open_temp()?;
storage.create_relation("edge", 2)?;
assert!(matches!(
storage.create_relation("edge", 2),
Err(StorageError::RelationExists(_))
));
Ok(())
}
}

View File

@ -0,0 +1,147 @@
//! In-memory backend, keyed by relation name. Always available.
use std::collections::HashMap;
use crate::value::Value;
use crate::{Storage, StorageError};
/// In-memory backend, useful as the default in tests and as a correctness
/// oracle for other backends.
#[derive(Debug, Default)]
pub struct MemoryStorage {
relations: HashMap<String, MemoryRelation>,
}
#[derive(Debug)]
struct MemoryRelation {
arity: usize,
rows: Vec<Vec<Value>>,
}
impl MemoryStorage {
#[must_use]
pub fn new() -> Self {
Self::default()
}
}
impl Storage for MemoryStorage {
fn create_relation(&mut self, name: &str, arity: usize) -> Result<(), StorageError> {
if self.relations.contains_key(name) {
return Err(StorageError::RelationExists(name.to_string()));
}
self.relations.insert(
name.to_string(),
MemoryRelation {
arity,
rows: Vec::new(),
},
);
Ok(())
}
fn arity(&self, name: &str) -> Result<usize, StorageError> {
self.relations
.get(name)
.map(|r| r.arity)
.ok_or_else(|| StorageError::RelationNotFound(name.to_string()))
}
fn scan(&self, name: &str) -> Result<Vec<Vec<Value>>, StorageError> {
self.relations
.get(name)
.map(|r| r.rows.clone())
.ok_or_else(|| StorageError::RelationNotFound(name.to_string()))
}
fn insert(&mut self, name: &str, row: Vec<Value>) -> Result<(), StorageError> {
let relation = self
.relations
.get_mut(name)
.ok_or_else(|| StorageError::RelationNotFound(name.to_string()))?;
if row.len() != relation.arity {
return Err(StorageError::ArityMismatch {
expected: relation.arity,
got: row.len(),
});
}
relation.rows.push(row);
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::scan_as_table;
fn i(x: i64) -> Value {
Value::Int(x)
}
#[test]
fn create_insert_scan_roundtrip() -> Result<(), StorageError> {
let mut storage = MemoryStorage::new();
storage.create_relation("edge", 2)?;
storage.insert("edge", vec![i(1), i(2)])?;
storage.insert("edge", vec![i(2), i(3)])?;
let rows = storage.scan("edge")?;
assert_eq!(rows, vec![vec![i(1), i(2)], vec![i(2), i(3)]]);
Ok(())
}
#[test]
fn duplicate_create_returns_err() -> Result<(), StorageError> {
let mut storage = MemoryStorage::new();
storage.create_relation("edge", 2)?;
assert!(matches!(
storage.create_relation("edge", 2),
Err(StorageError::RelationExists(_))
));
Ok(())
}
#[test]
fn scan_unknown_relation_returns_err() {
let storage = MemoryStorage::new();
assert!(matches!(
storage.scan("missing"),
Err(StorageError::RelationNotFound(_))
));
}
#[test]
fn arity_unknown_relation_returns_err() {
let storage = MemoryStorage::new();
assert!(matches!(
storage.arity("missing"),
Err(StorageError::RelationNotFound(_))
));
}
#[test]
fn insert_wrong_arity_returns_err() -> Result<(), StorageError> {
let mut storage = MemoryStorage::new();
storage.create_relation("edge", 2)?;
assert!(matches!(
storage.insert("edge", vec![i(1)]),
Err(StorageError::ArityMismatch {
expected: 2,
got: 1
})
));
Ok(())
}
#[test]
fn scan_as_table_materializes_table() -> Result<(), StorageError> {
let mut storage = MemoryStorage::new();
storage.create_relation("edge", 2)?;
storage.insert("edge", vec![i(1), i(2)])?;
let table = scan_as_table(&storage, "edge")?;
assert_eq!(table.arity, 2);
assert_eq!(table.rows, vec![vec![i(1), i(2)]]);
Ok(())
}
}

View File

@ -0,0 +1,183 @@
//! redb adapter.
//!
//! Each relation gets a redb table named after it, keyed by `u64` row IDs.
//! A reserved table named `__meta`, keyed by relation name, carries per-relation
//! metadata (arity and next synthetic row ID).
use redb::{Database, ReadableTable, TableDefinition};
use crate::value::Value;
use crate::codec::{decode_meta, decode_row, encode_meta, encode_row};
use crate::{Storage, StorageError};
const META_TABLE: &str = "__meta";
fn backend<E: std::error::Error + Send + Sync + 'static>(err: E) -> StorageError {
StorageError::Backend(Box::new(err))
}
fn meta_def() -> TableDefinition<'static, &'static str, &'static [u8]> {
TableDefinition::new(META_TABLE)
}
fn rows_def(name: &str) -> TableDefinition<'_, u64, &'static [u8]> {
TableDefinition::new(name)
}
/// redb-backed [`Storage`] implementation.
pub struct RedbStorage {
db: Database,
}
impl RedbStorage {
/// Open or create a redb database at `path`.
///
/// # Errors
/// Returns [`StorageError::Backend`] if redb fails to open the file.
pub fn open(path: impl AsRef<std::path::Path>) -> Result<Self, StorageError> {
let db = Database::create(path).map_err(backend)?;
Ok(Self { db })
}
}
impl Storage for RedbStorage {
fn create_relation(&mut self, name: &str, arity: usize) -> Result<(), StorageError> {
if name == META_TABLE {
return Err(StorageError::Validation(format!(
"relation name '{name}' is reserved"
)));
}
let arity_u32 = u32::try_from(arity)
.map_err(|_| StorageError::Validation(format!("arity {arity} exceeds u32 range")))?;
let txn = self.db.begin_write().map_err(backend)?;
{
let mut meta = txn.open_table(meta_def()).map_err(backend)?;
if meta.get(name).map_err(backend)?.is_some() {
return Err(StorageError::RelationExists(name.to_string()));
}
let encoded = encode_meta(arity_u32, 0);
meta.insert(name, &encoded[..]).map_err(backend)?;
// open_table creates the rows table if it does not exist
let _ = txn.open_table(rows_def(name)).map_err(backend)?;
}
txn.commit().map_err(backend)?;
Ok(())
}
fn arity(&self, name: &str) -> Result<usize, StorageError> {
let txn = self.db.begin_read().map_err(backend)?;
let meta = txn.open_table(meta_def()).map_err(backend)?;
let raw = meta
.get(name)
.map_err(backend)?
.ok_or_else(|| StorageError::RelationNotFound(name.to_string()))?;
let (arity, _) = decode_meta(raw.value())?;
Ok(arity as usize)
}
fn scan(&self, name: &str) -> Result<Vec<Vec<Value>>, StorageError> {
let txn = self.db.begin_read().map_err(backend)?;
let meta = txn.open_table(meta_def()).map_err(backend)?;
if meta.get(name).map_err(backend)?.is_none() {
return Err(StorageError::RelationNotFound(name.to_string()));
}
let table = txn.open_table(rows_def(name)).map_err(backend)?;
let mut rows = Vec::new();
for entry in table.iter().map_err(backend)? {
let (_, value) = entry.map_err(backend)?;
rows.push(decode_row(value.value())?);
}
Ok(rows)
}
fn insert(&mut self, name: &str, row: Vec<Value>) -> Result<(), StorageError> {
let txn = self.db.begin_write().map_err(backend)?;
let (arity, next_id) = {
let meta = txn.open_table(meta_def()).map_err(backend)?;
let entry = meta
.get(name)
.map_err(backend)?
.ok_or_else(|| StorageError::RelationNotFound(name.to_string()))?;
decode_meta(entry.value())?
};
if row.len() != arity as usize {
return Err(StorageError::ArityMismatch {
expected: arity as usize,
got: row.len(),
});
}
{
let mut rows = txn.open_table(rows_def(name)).map_err(backend)?;
let encoded = encode_row(&row);
rows.insert(next_id, &encoded[..]).map_err(backend)?;
}
{
let mut meta = txn.open_table(meta_def()).map_err(backend)?;
let new_meta = encode_meta(arity, next_id + 1);
meta.insert(name, new_meta.as_ref()).map_err(backend)?;
}
txn.commit().map_err(backend)?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
fn i(x: i64) -> Value {
Value::Int(x)
}
fn s(x: &str) -> Value {
Value::Str(x.to_string())
}
fn open_temp() -> Result<RedbStorage, StorageError> {
let dir = tempfile::tempdir().map_err(backend)?;
// The file does not have to exist for redb::create.
let path = dir.path().join("test.redb");
let storage = RedbStorage::open(&path)?;
// Keep the tempdir alive by leaking it (test-only).
std::mem::forget(dir);
Ok(storage)
}
#[test]
fn create_insert_scan_roundtrip() -> Result<(), StorageError> {
let mut storage = open_temp()?;
storage.create_relation("edge", 2)?;
storage.insert("edge", vec![i(1), i(2)])?;
storage.insert("edge", vec![s("hello"), i(7)])?;
let rows = storage.scan("edge")?;
assert_eq!(rows, vec![vec![i(1), i(2)], vec![s("hello"), i(7)]]);
assert_eq!(storage.arity("edge")?, 2);
Ok(())
}
#[test]
fn duplicate_create_returns_err() -> Result<(), StorageError> {
let mut storage = open_temp()?;
storage.create_relation("edge", 2)?;
assert!(matches!(
storage.create_relation("edge", 2),
Err(StorageError::RelationExists(_))
));
Ok(())
}
#[test]
fn insert_wrong_arity_returns_err() -> Result<(), StorageError> {
let mut storage = open_temp()?;
storage.create_relation("edge", 2)?;
assert!(matches!(
storage.insert("edge", vec![i(1)]),
Err(StorageError::ArityMismatch {
expected: 2,
got: 1,
})
));
Ok(())
}
}

View File

@ -0,0 +1,161 @@
//! Sled adapter.
//!
//! Maps each relation onto a sled [`Tree`](sled::Tree) of the same name. A
//! reserved tree named `__meta` carries per-relation metadata (arity and the
//! next synthetic row ID).
use crate::value::Value;
use crate::codec::{decode_meta, decode_row, encode_meta, encode_row, row_key};
use crate::{Storage, StorageError};
const META_TREE: &str = "__meta";
fn backend<E: std::error::Error + Send + Sync + 'static>(err: E) -> StorageError {
StorageError::Backend(Box::new(err))
}
/// Sled-backed [`Storage`] implementation.
pub struct SledStorage {
db: sled::Db,
}
impl SledStorage {
/// Open or create a sled database at `path`.
///
/// # Errors
/// Returns [`StorageError::Backend`] if sled fails to open the path.
pub fn open(path: impl AsRef<std::path::Path>) -> Result<Self, StorageError> {
let db = sled::open(path).map_err(backend)?;
Ok(Self { db })
}
fn meta_tree(&self) -> Result<sled::Tree, StorageError> {
self.db.open_tree(META_TREE).map_err(backend)
}
fn relation_tree(&self, name: &str) -> Result<sled::Tree, StorageError> {
self.db.open_tree(name).map_err(backend)
}
fn load_meta(&self, name: &str) -> Result<(u32, u64), StorageError> {
let meta = self.meta_tree()?;
let raw = meta
.get(name.as_bytes())
.map_err(backend)?
.ok_or_else(|| StorageError::RelationNotFound(name.to_string()))?;
Ok(decode_meta(raw.as_ref())?)
}
fn store_meta(&self, name: &str, arity: u32, next_id: u64) -> Result<(), StorageError> {
let meta = self.meta_tree()?;
let encoded = encode_meta(arity, next_id);
meta.insert(name.as_bytes(), encoded.as_ref())
.map_err(backend)?;
Ok(())
}
}
impl Storage for SledStorage {
fn create_relation(&mut self, name: &str, arity: usize) -> Result<(), StorageError> {
if name == META_TREE {
return Err(StorageError::Validation(format!(
"relation name '{name}' is reserved"
)));
}
let meta = self.meta_tree()?;
if meta.contains_key(name.as_bytes()).map_err(backend)? {
return Err(StorageError::RelationExists(name.to_string()));
}
let arity_u32 = u32::try_from(arity)
.map_err(|_| StorageError::Validation(format!("arity {arity} exceeds u32 range")))?;
self.store_meta(name, arity_u32, 0)?;
// open_tree creates the tree if it doesn't exist
let _ = self.relation_tree(name)?;
Ok(())
}
fn arity(&self, name: &str) -> Result<usize, StorageError> {
let (arity, _) = self.load_meta(name)?;
Ok(arity as usize)
}
fn scan(&self, name: &str) -> Result<Vec<Vec<Value>>, StorageError> {
let _ = self.load_meta(name)?;
let tree = self.relation_tree(name)?;
let mut rows = Vec::new();
for entry in &tree {
let (_, value) = entry.map_err(backend)?;
rows.push(decode_row(value.as_ref())?);
}
Ok(rows)
}
fn insert(&mut self, name: &str, row: Vec<Value>) -> Result<(), StorageError> {
let (arity, next_id) = self.load_meta(name)?;
if row.len() != arity as usize {
return Err(StorageError::ArityMismatch {
expected: arity as usize,
got: row.len(),
});
}
let tree = self.relation_tree(name)?;
tree.insert(row_key(next_id), encode_row(&row))
.map_err(backend)?;
self.store_meta(name, arity, next_id + 1)?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
fn i(x: i64) -> Value {
Value::Int(x)
}
#[test]
fn create_insert_scan_roundtrip() -> Result<(), StorageError> {
let dir = tempfile::tempdir().map_err(backend)?;
let mut storage = SledStorage::open(dir.path())?;
storage.create_relation("edge", 2)?;
storage.insert("edge", vec![i(1), i(2)])?;
storage.insert("edge", vec![i(2), i(3)])?;
storage.insert("edge", vec![i(3), i(3)])?;
let rows = storage.scan("edge")?;
assert_eq!(
rows,
vec![vec![i(1), i(2)], vec![i(2), i(3)], vec![i(3), i(3)],],
);
assert_eq!(storage.arity("edge")?, 2);
Ok(())
}
#[test]
fn duplicate_create_returns_err() -> Result<(), StorageError> {
let dir = tempfile::tempdir().map_err(backend)?;
let mut storage = SledStorage::open(dir.path())?;
storage.create_relation("edge", 2)?;
assert!(matches!(
storage.create_relation("edge", 2),
Err(StorageError::RelationExists(_))
));
Ok(())
}
#[test]
fn insert_wrong_arity_returns_err() -> Result<(), StorageError> {
let dir = tempfile::tempdir().map_err(backend)?;
let mut storage = SledStorage::open(dir.path())?;
storage.create_relation("edge", 2)?;
assert!(matches!(
storage.insert("edge", vec![i(1)]),
Err(StorageError::ArityMismatch {
expected: 2,
got: 1,
})
));
Ok(())
}
}