Compare commits
2 Commits
1848af6d32
...
b572161142
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b572161142 | ||
|
|
ed8c438135 |
1097
Cargo.lock
generated
1097
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@ -9,3 +9,4 @@ rust-version.workspace = true
|
|||||||
workspace = true
|
workspace = true
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
query-storage = { path = "../query-storage" }
|
||||||
|
|||||||
@ -9,7 +9,10 @@
|
|||||||
|
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
|
||||||
use crate::{relation::Relation, table::Table, value::Value};
|
use query_storage::table::Table;
|
||||||
|
use query_storage::value::Value;
|
||||||
|
|
||||||
|
use crate::relation::Relation;
|
||||||
|
|
||||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
pub enum Term {
|
pub enum Term {
|
||||||
|
|||||||
@ -11,7 +11,9 @@
|
|||||||
|
|
||||||
use std::collections::{HashMap, HashSet};
|
use std::collections::{HashMap, HashSet};
|
||||||
|
|
||||||
use crate::{relation::Relation, value::Value};
|
use query_storage::value::Value;
|
||||||
|
|
||||||
|
use crate::relation::Relation;
|
||||||
|
|
||||||
fn shared_columns(left: &Relation, right: &Relation) -> Vec<(usize, usize)> {
|
fn shared_columns(left: &Relation, right: &Relation) -> Vec<(usize, usize)> {
|
||||||
left.columns
|
left.columns
|
||||||
|
|||||||
@ -2,8 +2,8 @@
|
|||||||
//!
|
//!
|
||||||
//! Three operators are in scope:
|
//! Three operators are in scope:
|
||||||
//!
|
//!
|
||||||
//! - [`atom::scan_atom`] scans a [`table::Table`] under an
|
//! - [`atom::scan_atom`] scans a [`Table`](query_storage::table::Table) under
|
||||||
//! [`atom::AtomPattern`], filtering for repeated-variable equality and
|
//! an [`atom::AtomPattern`], filtering for repeated-variable equality and
|
||||||
//! literal equality, and outputs a binding [`relation::Relation`].
|
//! literal equality, and outputs a binding [`relation::Relation`].
|
||||||
//! - [`join::semijoin`] keeps rows of one relation whose shared-column values
|
//! - [`join::semijoin`] keeps rows of one relation whose shared-column values
|
||||||
//! appear in another.
|
//! appear in another.
|
||||||
@ -14,10 +14,11 @@
|
|||||||
//! is just an expression like
|
//! is just an expression like
|
||||||
//! `natural_join(&semijoin(&a, &b), &scan_atom(&t, &p))`.
|
//! `natural_join(&semijoin(&a, &b), &scan_atom(&t, &p))`.
|
||||||
//!
|
//!
|
||||||
//! Integration with an external query-plan IR is out of scope.
|
//! Foundational types [`Value`](query_storage::value::Value) and
|
||||||
|
//! [`Table`](query_storage::table::Table) live in `query-storage`, the
|
||||||
|
//! storage-layer crate this crate is built on; storage backends produce
|
||||||
|
//! `Table`s that operators here consume.
|
||||||
|
|
||||||
pub mod atom;
|
pub mod atom;
|
||||||
pub mod join;
|
pub mod join;
|
||||||
pub mod relation;
|
pub mod relation;
|
||||||
pub mod table;
|
|
||||||
pub mod value;
|
|
||||||
|
|||||||
@ -10,7 +10,7 @@
|
|||||||
|
|
||||||
use std::collections::HashSet;
|
use std::collections::HashSet;
|
||||||
|
|
||||||
use crate::value::Value;
|
use query_storage::value::Value;
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct Relation {
|
pub struct Relation {
|
||||||
|
|||||||
@ -15,8 +15,8 @@
|
|||||||
|
|
||||||
use query_ops::atom::{AtomPattern, Term, scan_atom};
|
use query_ops::atom::{AtomPattern, Term, scan_atom};
|
||||||
use query_ops::join::{natural_join, semijoin};
|
use query_ops::join::{natural_join, semijoin};
|
||||||
use query_ops::table::Table;
|
use query_storage::table::Table;
|
||||||
use query_ops::value::Value;
|
use query_storage::value::Value;
|
||||||
|
|
||||||
fn s(x: &str) -> Value {
|
fn s(x: &str) -> Value {
|
||||||
Value::Str(x.to_string())
|
Value::Str(x.to_string())
|
||||||
|
|||||||
40
crates/query-ops/tests/storage_bridge.rs
Normal file
40
crates/query-ops/tests/storage_bridge.rs
Normal file
@ -0,0 +1,40 @@
|
|||||||
|
//! End-to-end: load rows into [`MemoryStorage`], scan as a [`Table`],
|
||||||
|
//! run [`scan_atom`] against it.
|
||||||
|
//!
|
||||||
|
//! Demonstrates that `query-ops` operators can consume from a storage backend
|
||||||
|
//! through the [`scan_as_table`] bridge, with no changes to `query-ops` itself.
|
||||||
|
|
||||||
|
use query_ops::atom::{AtomPattern, Term, scan_atom};
|
||||||
|
use query_storage::table::Table;
|
||||||
|
use query_storage::value::Value;
|
||||||
|
use query_storage::{MemoryStorage, Storage, StorageError, scan_as_table};
|
||||||
|
|
||||||
|
fn i(x: i64) -> Value {
|
||||||
|
Value::Int(x)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn var(name: &str) -> Term {
|
||||||
|
Term::Var(name.to_string())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn scan_atom_consumes_from_memory_backend() -> Result<(), StorageError> {
|
||||||
|
let mut storage = MemoryStorage::new();
|
||||||
|
storage.create_relation("edge", 2)?;
|
||||||
|
storage.insert("edge", vec![i(1), i(2)])?;
|
||||||
|
storage.insert("edge", vec![i(2), i(2)])?;
|
||||||
|
storage.insert("edge", vec![i(3), i(3)])?;
|
||||||
|
storage.insert("edge", vec![i(4), i(5)])?;
|
||||||
|
|
||||||
|
let edge_table: Table = scan_as_table(&storage, "edge")?;
|
||||||
|
let self_loops = scan_atom(
|
||||||
|
&edge_table,
|
||||||
|
&AtomPattern {
|
||||||
|
columns: vec![var("X"), var("X")],
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
assert_eq!(self_loops.columns, vec!["X".to_string()]);
|
||||||
|
assert_eq!(self_loops.rows, vec![vec![i(2)], vec![i(3)]]);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
30
crates/query-storage/Cargo.toml
Normal file
30
crates/query-storage/Cargo.toml
Normal file
@ -0,0 +1,30 @@
|
|||||||
|
[package]
|
||||||
|
name = "query-storage"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition.workspace = true
|
||||||
|
license.workspace = true
|
||||||
|
rust-version.workspace = true
|
||||||
|
|
||||||
|
[lints.rust]
|
||||||
|
unsafe_code = "deny"
|
||||||
|
|
||||||
|
[lints.clippy]
|
||||||
|
pedantic = "warn"
|
||||||
|
|
||||||
|
[features]
|
||||||
|
default = []
|
||||||
|
lmdb = ["dep:heed"]
|
||||||
|
redb = ["dep:redb"]
|
||||||
|
fjall = ["dep:fjall"]
|
||||||
|
sled = ["dep:sled"]
|
||||||
|
geomerge = ["dep:geomerge"]
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
heed = { version = "0.20", optional = true }
|
||||||
|
redb = { version = "2", optional = true }
|
||||||
|
fjall = { version = "2", optional = true }
|
||||||
|
sled = { version = "0.34", optional = true }
|
||||||
|
geomerge = { path = "../../external/geomerge/crates/geomerge", optional = true }
|
||||||
|
|
||||||
|
[dev-dependencies]
|
||||||
|
tempfile = "3"
|
||||||
262
crates/query-storage/src/codec.rs
Normal file
262
crates/query-storage/src/codec.rs
Normal file
@ -0,0 +1,262 @@
|
|||||||
|
//! Wire format shared by every byte-oriented backend in this crate.
|
||||||
|
//!
|
||||||
|
//! The encoding is hand-rolled (no `serde`, no `bincode`) so that the
|
||||||
|
//! generated bytes are stable and inspectable. It is **not** versioned: adding
|
||||||
|
//! a new [`Value`] variant invalidates previously-stored data. That is fine
|
||||||
|
//! for a playground; production code would prepend a format byte.
|
||||||
|
//!
|
||||||
|
//! ## Row Format
|
||||||
|
//!
|
||||||
|
//! `[count: u32 LE] [val × count]`
|
||||||
|
//!
|
||||||
|
//! ## Value Format
|
||||||
|
//!
|
||||||
|
//! `[tag: u8] [payload]`
|
||||||
|
//!
|
||||||
|
//! | Tag | Variant | Payload |
|
||||||
|
//! |--------|---------------|--------------------------------------|
|
||||||
|
//! | `0x00` | `Value::Int` | `i64 LE` (8 bytes) |
|
||||||
|
//! | `0x01` | `Value::Str` | `[len: u32 LE] [bytes]` |
|
||||||
|
//!
|
||||||
|
//! ## Row Key Format
|
||||||
|
//!
|
||||||
|
//! Synthetic row IDs are `u64` encoded big-endian so lexicographic key order
|
||||||
|
//! matches insertion order. Backends with named sub-stores per relation can
|
||||||
|
//! use this directly as the key.
|
||||||
|
//!
|
||||||
|
//! ## Metadata Format
|
||||||
|
//!
|
||||||
|
//! Per-relation metadata is `[arity: u32 LE] [next_id: u64 LE]` = 12 bytes.
|
||||||
|
|
||||||
|
use crate::value::Value;
|
||||||
|
|
||||||
|
/// Errors raised by [`decode_row`] and [`decode_meta`].
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum CodecError {
|
||||||
|
/// The byte slice ended before the expected number of fields was read.
|
||||||
|
UnexpectedEof,
|
||||||
|
/// A value tag byte was unrecognized.
|
||||||
|
UnknownTag(u8),
|
||||||
|
/// A length field declared more bytes than the slice contains.
|
||||||
|
LengthOverrun { declared: usize, available: usize },
|
||||||
|
/// A UTF-8 string payload could not be decoded.
|
||||||
|
InvalidUtf8,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::fmt::Display for CodecError {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
match self {
|
||||||
|
Self::UnexpectedEof => write!(f, "unexpected end of bytes"),
|
||||||
|
Self::UnknownTag(t) => write!(f, "unknown value tag: 0x{t:02x}"),
|
||||||
|
Self::LengthOverrun {
|
||||||
|
declared,
|
||||||
|
available,
|
||||||
|
} => write!(
|
||||||
|
f,
|
||||||
|
"declared length {declared} exceeds available {available} bytes"
|
||||||
|
),
|
||||||
|
Self::InvalidUtf8 => write!(f, "invalid UTF-8 in string payload"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::error::Error for CodecError {}
|
||||||
|
|
||||||
|
/// Encode a row of [`Value`]s to bytes.
|
||||||
|
#[must_use]
|
||||||
|
pub fn encode_row(row: &[Value]) -> Vec<u8> {
|
||||||
|
let mut out = Vec::with_capacity(4 + row.len() * 9);
|
||||||
|
out.extend_from_slice(&u32::try_from(row.len()).unwrap_or(u32::MAX).to_le_bytes());
|
||||||
|
for value in row {
|
||||||
|
match value {
|
||||||
|
Value::Int(i) => {
|
||||||
|
out.push(0x00);
|
||||||
|
out.extend_from_slice(&i.to_le_bytes());
|
||||||
|
}
|
||||||
|
Value::Str(s) => {
|
||||||
|
out.push(0x01);
|
||||||
|
let bytes = s.as_bytes();
|
||||||
|
out.extend_from_slice(
|
||||||
|
&u32::try_from(bytes.len()).unwrap_or(u32::MAX).to_le_bytes(),
|
||||||
|
);
|
||||||
|
out.extend_from_slice(bytes);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
out
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Decode a row of [`Value`]s from bytes.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
/// Returns [`CodecError`] if the byte slice is malformed.
|
||||||
|
pub fn decode_row(mut bytes: &[u8]) -> Result<Vec<Value>, CodecError> {
|
||||||
|
let count = read_u32(&mut bytes)? as usize;
|
||||||
|
let mut row = Vec::with_capacity(count);
|
||||||
|
for _ in 0..count {
|
||||||
|
row.push(read_value(&mut bytes)?);
|
||||||
|
}
|
||||||
|
Ok(row)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn read_value(bytes: &mut &[u8]) -> Result<Value, CodecError> {
|
||||||
|
let tag = read_u8(bytes)?;
|
||||||
|
match tag {
|
||||||
|
0x00 => {
|
||||||
|
let i = read_i64(bytes)?;
|
||||||
|
Ok(Value::Int(i))
|
||||||
|
}
|
||||||
|
0x01 => {
|
||||||
|
let len = read_u32(bytes)? as usize;
|
||||||
|
if bytes.len() < len {
|
||||||
|
return Err(CodecError::LengthOverrun {
|
||||||
|
declared: len,
|
||||||
|
available: bytes.len(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
let (head, tail) = bytes.split_at(len);
|
||||||
|
*bytes = tail;
|
||||||
|
let s = std::str::from_utf8(head)
|
||||||
|
.map_err(|_| CodecError::InvalidUtf8)?
|
||||||
|
.to_string();
|
||||||
|
Ok(Value::Str(s))
|
||||||
|
}
|
||||||
|
other => Err(CodecError::UnknownTag(other)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn read_u8(bytes: &mut &[u8]) -> Result<u8, CodecError> {
|
||||||
|
let (head, tail) = bytes.split_first().ok_or(CodecError::UnexpectedEof)?;
|
||||||
|
*bytes = tail;
|
||||||
|
Ok(*head)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn read_u32(bytes: &mut &[u8]) -> Result<u32, CodecError> {
|
||||||
|
if bytes.len() < 4 {
|
||||||
|
return Err(CodecError::UnexpectedEof);
|
||||||
|
}
|
||||||
|
let (head, tail) = bytes.split_at(4);
|
||||||
|
*bytes = tail;
|
||||||
|
let mut buf = [0u8; 4];
|
||||||
|
buf.copy_from_slice(head);
|
||||||
|
Ok(u32::from_le_bytes(buf))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn read_u64(bytes: &mut &[u8]) -> Result<u64, CodecError> {
|
||||||
|
if bytes.len() < 8 {
|
||||||
|
return Err(CodecError::UnexpectedEof);
|
||||||
|
}
|
||||||
|
let (head, tail) = bytes.split_at(8);
|
||||||
|
*bytes = tail;
|
||||||
|
let mut buf = [0u8; 8];
|
||||||
|
buf.copy_from_slice(head);
|
||||||
|
Ok(u64::from_le_bytes(buf))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn read_i64(bytes: &mut &[u8]) -> Result<i64, CodecError> {
|
||||||
|
if bytes.len() < 8 {
|
||||||
|
return Err(CodecError::UnexpectedEof);
|
||||||
|
}
|
||||||
|
let (head, tail) = bytes.split_at(8);
|
||||||
|
*bytes = tail;
|
||||||
|
let mut buf = [0u8; 8];
|
||||||
|
buf.copy_from_slice(head);
|
||||||
|
Ok(i64::from_le_bytes(buf))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Encode a row key from a synthetic u64 ID.
|
||||||
|
///
|
||||||
|
/// Big-endian so lexicographic key order matches insertion order.
|
||||||
|
#[must_use]
|
||||||
|
pub fn row_key(id: u64) -> [u8; 8] {
|
||||||
|
id.to_be_bytes()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Encode per-relation metadata: arity and next row ID.
|
||||||
|
#[must_use]
|
||||||
|
pub fn encode_meta(arity: u32, next_id: u64) -> [u8; 12] {
|
||||||
|
let mut out = [0u8; 12];
|
||||||
|
out[0..4].copy_from_slice(&arity.to_le_bytes());
|
||||||
|
out[4..12].copy_from_slice(&next_id.to_le_bytes());
|
||||||
|
out
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Decode per-relation metadata.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
/// Returns [`CodecError::UnexpectedEof`] if the slice is shorter than 12 bytes.
|
||||||
|
pub fn decode_meta(mut bytes: &[u8]) -> Result<(u32, u64), CodecError> {
|
||||||
|
let arity = read_u32(&mut bytes)?;
|
||||||
|
let next_id = read_u64(&mut bytes)?;
|
||||||
|
Ok((arity, next_id))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
fn i(x: i64) -> Value {
|
||||||
|
Value::Int(x)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn s(x: &str) -> Value {
|
||||||
|
Value::Str(x.to_string())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn encode_decode_int_only_row() -> Result<(), CodecError> {
|
||||||
|
let row = vec![i(1), i(-2), i(i64::MAX)];
|
||||||
|
let bytes = encode_row(&row);
|
||||||
|
let decoded = decode_row(&bytes)?;
|
||||||
|
assert_eq!(decoded, row);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn encode_decode_mixed_row() -> Result<(), CodecError> {
|
||||||
|
let row = vec![s("Alice"), i(42), s("a longer string with spaces")];
|
||||||
|
let bytes = encode_row(&row);
|
||||||
|
let decoded = decode_row(&bytes)?;
|
||||||
|
assert_eq!(decoded, row);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn encode_decode_empty_row() -> Result<(), CodecError> {
|
||||||
|
let bytes = encode_row(&[]);
|
||||||
|
let decoded = decode_row(&bytes)?;
|
||||||
|
assert!(decoded.is_empty());
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn decode_unknown_tag_fails() {
|
||||||
|
let bytes = vec![1, 0, 0, 0, 0xFF];
|
||||||
|
assert!(matches!(
|
||||||
|
decode_row(&bytes),
|
||||||
|
Err(CodecError::UnknownTag(0xFF))
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn decode_truncated_fails() {
|
||||||
|
let bytes = vec![1, 0, 0, 0, 0x00, 0x01];
|
||||||
|
assert!(matches!(decode_row(&bytes), Err(CodecError::UnexpectedEof)));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn row_key_preserves_order() {
|
||||||
|
assert!(row_key(1) < row_key(2));
|
||||||
|
assert!(row_key(255) < row_key(256));
|
||||||
|
assert!(row_key(u64::MAX - 1) < row_key(u64::MAX));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn meta_roundtrip() -> Result<(), CodecError> {
|
||||||
|
let encoded = encode_meta(3, 12345);
|
||||||
|
let (arity, next_id) = decode_meta(&encoded)?;
|
||||||
|
assert_eq!(arity, 3);
|
||||||
|
assert_eq!(next_id, 12345);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
149
crates/query-storage/src/fjall.rs
Normal file
149
crates/query-storage/src/fjall.rs
Normal file
@ -0,0 +1,149 @@
|
|||||||
|
//! fjall adapter.
|
||||||
|
//!
|
||||||
|
//! Each relation gets a fjall [`PartitionHandle`](fjall::PartitionHandle) of
|
||||||
|
//! the same name. A reserved partition named `__meta` carries per-relation
|
||||||
|
//! metadata (arity and next synthetic row ID).
|
||||||
|
|
||||||
|
use fjall::{Keyspace, PartitionCreateOptions, PartitionHandle};
|
||||||
|
|
||||||
|
use crate::value::Value;
|
||||||
|
|
||||||
|
use crate::codec::{decode_meta, decode_row, encode_meta, encode_row, row_key};
|
||||||
|
use crate::{Storage, StorageError};
|
||||||
|
|
||||||
|
const META_PARTITION: &str = "__meta";
|
||||||
|
|
||||||
|
fn backend<E: std::error::Error + Send + Sync + 'static>(err: E) -> StorageError {
|
||||||
|
StorageError::Backend(Box::new(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// fjall-backed [`Storage`] implementation.
|
||||||
|
pub struct FjallStorage {
|
||||||
|
keyspace: Keyspace,
|
||||||
|
meta: PartitionHandle,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FjallStorage {
|
||||||
|
/// Open or create a fjall keyspace at `path`.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
/// Returns [`StorageError::Backend`] if fjall fails to open the path.
|
||||||
|
pub fn open(path: impl AsRef<std::path::Path>) -> Result<Self, StorageError> {
|
||||||
|
let keyspace = fjall::Config::new(path).open().map_err(backend)?;
|
||||||
|
let meta = keyspace
|
||||||
|
.open_partition(META_PARTITION, PartitionCreateOptions::default())
|
||||||
|
.map_err(backend)?;
|
||||||
|
Ok(Self { keyspace, meta })
|
||||||
|
}
|
||||||
|
|
||||||
|
fn relation_partition(&self, name: &str) -> Result<PartitionHandle, StorageError> {
|
||||||
|
self.keyspace
|
||||||
|
.open_partition(name, PartitionCreateOptions::default())
|
||||||
|
.map_err(backend)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn load_meta(&self, name: &str) -> Result<(u32, u64), StorageError> {
|
||||||
|
let raw = self
|
||||||
|
.meta
|
||||||
|
.get(name.as_bytes())
|
||||||
|
.map_err(backend)?
|
||||||
|
.ok_or_else(|| StorageError::RelationNotFound(name.to_string()))?;
|
||||||
|
Ok(decode_meta(raw.as_ref())?)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn store_meta(&self, name: &str, arity: u32, next_id: u64) -> Result<(), StorageError> {
|
||||||
|
self.meta
|
||||||
|
.insert(name.as_bytes(), encode_meta(arity, next_id))
|
||||||
|
.map_err(backend)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Storage for FjallStorage {
|
||||||
|
fn create_relation(&mut self, name: &str, arity: usize) -> Result<(), StorageError> {
|
||||||
|
if name == META_PARTITION {
|
||||||
|
return Err(StorageError::Validation(format!(
|
||||||
|
"relation name '{name}' is reserved"
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
if self.meta.contains_key(name.as_bytes()).map_err(backend)? {
|
||||||
|
return Err(StorageError::RelationExists(name.to_string()));
|
||||||
|
}
|
||||||
|
let arity_u32 = u32::try_from(arity)
|
||||||
|
.map_err(|_| StorageError::Validation(format!("arity {arity} exceeds u32 range")))?;
|
||||||
|
self.store_meta(name, arity_u32, 0)?;
|
||||||
|
let _ = self.relation_partition(name)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn arity(&self, name: &str) -> Result<usize, StorageError> {
|
||||||
|
let (arity, _) = self.load_meta(name)?;
|
||||||
|
Ok(arity as usize)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn scan(&self, name: &str) -> Result<Vec<Vec<Value>>, StorageError> {
|
||||||
|
let _ = self.load_meta(name)?;
|
||||||
|
let partition = self.relation_partition(name)?;
|
||||||
|
let mut rows = Vec::new();
|
||||||
|
for entry in partition.iter() {
|
||||||
|
let (_, value) = entry.map_err(backend)?;
|
||||||
|
rows.push(decode_row(value.as_ref())?);
|
||||||
|
}
|
||||||
|
Ok(rows)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn insert(&mut self, name: &str, row: Vec<Value>) -> Result<(), StorageError> {
|
||||||
|
let (arity, next_id) = self.load_meta(name)?;
|
||||||
|
if row.len() != arity as usize {
|
||||||
|
return Err(StorageError::ArityMismatch {
|
||||||
|
expected: arity as usize,
|
||||||
|
got: row.len(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
let partition = self.relation_partition(name)?;
|
||||||
|
partition
|
||||||
|
.insert(row_key(next_id), encode_row(&row))
|
||||||
|
.map_err(backend)?;
|
||||||
|
self.store_meta(name, arity, next_id + 1)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
fn i(x: i64) -> Value {
|
||||||
|
Value::Int(x)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn open_temp() -> Result<FjallStorage, StorageError> {
|
||||||
|
let dir = tempfile::tempdir().map_err(backend)?;
|
||||||
|
let storage = FjallStorage::open(dir.path())?;
|
||||||
|
std::mem::forget(dir);
|
||||||
|
Ok(storage)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn create_insert_scan_roundtrip() -> Result<(), StorageError> {
|
||||||
|
let mut storage = open_temp()?;
|
||||||
|
storage.create_relation("edge", 2)?;
|
||||||
|
storage.insert("edge", vec![i(1), i(2)])?;
|
||||||
|
storage.insert("edge", vec![i(2), i(3)])?;
|
||||||
|
let rows = storage.scan("edge")?;
|
||||||
|
assert_eq!(rows, vec![vec![i(1), i(2)], vec![i(2), i(3)]]);
|
||||||
|
assert_eq!(storage.arity("edge")?, 2);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn duplicate_create_returns_err() -> Result<(), StorageError> {
|
||||||
|
let mut storage = open_temp()?;
|
||||||
|
storage.create_relation("edge", 2)?;
|
||||||
|
assert!(matches!(
|
||||||
|
storage.create_relation("edge", 2),
|
||||||
|
Err(StorageError::RelationExists(_))
|
||||||
|
));
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
252
crates/query-storage/src/geomerge.rs
Normal file
252
crates/query-storage/src/geomerge.rs
Normal file
@ -0,0 +1,252 @@
|
|||||||
|
//! Geomerge adapter.
|
||||||
|
//!
|
||||||
|
//! Unlike the other backends, geomerge schemas are **immutable after store
|
||||||
|
//! construction**: there is no public API to register a new table on a live
|
||||||
|
//! `Store`. The adapter therefore expects all relations to be declared up
|
||||||
|
//! front via a `FlatTheory` passed to [`GeomergeStorage::from_theory`], and
|
||||||
|
//! [`Storage::create_relation`] becomes a verifier that the relation exists
|
||||||
|
//! in the loaded theory and that its arity matches.
|
||||||
|
//!
|
||||||
|
//! Additional v1 mismatches with the trait:
|
||||||
|
//!
|
||||||
|
//! - Column types are typed (`PrimInt` / `PrimString`) in geomerge but the
|
||||||
|
//! trait's `create_relation` only carries `arity`. The adapter cannot
|
||||||
|
//! declare a relation at runtime, so this issue surfaces only at insert
|
||||||
|
//! time when geomerge rejects a row with `StorageError::Validation`.
|
||||||
|
//! - Cells of type `CellValue::Id` cannot be represented in our `Value` enum.
|
||||||
|
//! Scanning a table that contains such cells returns `StorageError::Validation`.
|
||||||
|
//! - Every `insert` opens a fresh `Transaction` and commits. Law violations
|
||||||
|
//! surface at commit time, not at the `add` call.
|
||||||
|
|
||||||
|
use std::collections::HashSet;
|
||||||
|
|
||||||
|
use geomerge::ir::{self, Path};
|
||||||
|
use geomerge::store::Store;
|
||||||
|
use geomerge::table::CellValue;
|
||||||
|
use geomerge::txn::ops::TxnCellValue;
|
||||||
|
|
||||||
|
use crate::value::Value;
|
||||||
|
|
||||||
|
use crate::{Storage, StorageError};
|
||||||
|
|
||||||
|
fn backend<E: std::error::Error + Send + Sync + 'static>(err: E) -> StorageError {
|
||||||
|
StorageError::Backend(Box::new(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn validation(msg: impl Into<String>) -> StorageError {
|
||||||
|
StorageError::Validation(msg.into())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Geomerge-backed [`Storage`] implementation.
|
||||||
|
///
|
||||||
|
/// Construct via [`GeomergeStorage::new`] (empty store, no relations) or
|
||||||
|
/// [`GeomergeStorage::from_theory`] (preloaded with a `FlatTheory`).
|
||||||
|
pub struct GeomergeStorage {
|
||||||
|
store: Store,
|
||||||
|
declared: HashSet<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for GeomergeStorage {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self::new()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl GeomergeStorage {
|
||||||
|
/// Build an empty store. No relations are available until the store is
|
||||||
|
/// rebuilt via a theory.
|
||||||
|
#[must_use]
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
store: Store::new(),
|
||||||
|
declared: HashSet::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Build a store from a pre-defined `FlatTheory`. All `create_relation`
|
||||||
|
/// calls must reference relations declared in the theory.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
/// Returns [`StorageError::Backend`] if geomerge rejects the theory.
|
||||||
|
pub fn from_theory(theory: ir::FlatTheory) -> Result<Self, StorageError> {
|
||||||
|
let store = Store::try_from_theory(theory).map_err(|e| backend(*e))?;
|
||||||
|
Ok(Self {
|
||||||
|
store,
|
||||||
|
declared: HashSet::new(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Storage for GeomergeStorage {
|
||||||
|
fn create_relation(&mut self, name: &str, arity: usize) -> Result<(), StorageError> {
|
||||||
|
if self.declared.contains(name) {
|
||||||
|
return Err(StorageError::RelationExists(name.to_string()));
|
||||||
|
}
|
||||||
|
let path: Path = name.into();
|
||||||
|
let table = self.store.table_at(&path).ok_or_else(|| {
|
||||||
|
validation(format!(
|
||||||
|
"relation '{name}' is not declared in the loaded geomerge theory; \
|
||||||
|
geomerge does not support runtime relation creation"
|
||||||
|
))
|
||||||
|
})?;
|
||||||
|
let declared_arity = table.schema().columns.len();
|
||||||
|
if declared_arity != arity {
|
||||||
|
return Err(StorageError::ArityMismatch {
|
||||||
|
expected: declared_arity,
|
||||||
|
got: arity,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
self.declared.insert(name.to_string());
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn arity(&self, name: &str) -> Result<usize, StorageError> {
|
||||||
|
let path: Path = name.into();
|
||||||
|
let table = self
|
||||||
|
.store
|
||||||
|
.table_at(&path)
|
||||||
|
.ok_or_else(|| StorageError::RelationNotFound(name.to_string()))?;
|
||||||
|
Ok(table.schema().columns.len())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn scan(&self, name: &str) -> Result<Vec<Vec<Value>>, StorageError> {
|
||||||
|
let path: Path = name.into();
|
||||||
|
let table = self
|
||||||
|
.store
|
||||||
|
.table_at(&path)
|
||||||
|
.ok_or_else(|| StorageError::RelationNotFound(name.to_string()))?;
|
||||||
|
let arity = table.schema().columns.len();
|
||||||
|
let mut rows = Vec::with_capacity(table.row_count());
|
||||||
|
for r in 0..table.row_count() {
|
||||||
|
let mut row = Vec::with_capacity(arity);
|
||||||
|
for c in 0..arity {
|
||||||
|
let cell = table
|
||||||
|
.cell_at(r, c)
|
||||||
|
.ok_or_else(|| validation(format!("missing cell at ({r}, {c}) in '{name}'")))?;
|
||||||
|
row.push(cell_to_value(cell)?);
|
||||||
|
}
|
||||||
|
rows.push(row);
|
||||||
|
}
|
||||||
|
Ok(rows)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn insert(&mut self, name: &str, row: Vec<Value>) -> Result<(), StorageError> {
|
||||||
|
let path: Path = name.into();
|
||||||
|
let arity = self.arity(name)?;
|
||||||
|
if row.len() != arity {
|
||||||
|
return Err(StorageError::ArityMismatch {
|
||||||
|
expected: arity,
|
||||||
|
got: row.len(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
let values: Vec<TxnCellValue> = row.into_iter().map(value_to_txn_cell).collect();
|
||||||
|
let mut txn = self.store.transaction();
|
||||||
|
txn.add(&path, values)
|
||||||
|
.map_err(|e| validation(e.to_string()))?;
|
||||||
|
// Law violations surface here at commit time, not at add time.
|
||||||
|
txn.commit().map_err(|e| validation(e.to_string()))?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn cell_to_value(cell: &CellValue) -> Result<Value, StorageError> {
|
||||||
|
match cell {
|
||||||
|
CellValue::Int(i) => Ok(Value::Int(*i)),
|
||||||
|
CellValue::Str(s) => Ok(Value::Str(s.clone())),
|
||||||
|
CellValue::Id(_) => Err(validation(
|
||||||
|
"geomerge CellValue::Id cannot be represented in the playground's Value enum",
|
||||||
|
)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn value_to_txn_cell(value: Value) -> TxnCellValue {
|
||||||
|
match value {
|
||||||
|
Value::Int(i) => TxnCellValue::Int(i),
|
||||||
|
Value::Str(s) => TxnCellValue::Str(s),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use geomerge::ir::{ColType, FlatTheory, PrimType, Schema, TableEntry};
|
||||||
|
|
||||||
|
fn i(x: i64) -> Value {
|
||||||
|
Value::Int(x)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn int_schema(arity: usize) -> Schema {
|
||||||
|
Schema {
|
||||||
|
columns: (0..arity)
|
||||||
|
.map(|_| ColType::PrimType {
|
||||||
|
prim: PrimType::PrimInt,
|
||||||
|
})
|
||||||
|
.collect(),
|
||||||
|
primary_key: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn theory_with_one_int_table(name: &str, arity: usize) -> FlatTheory {
|
||||||
|
FlatTheory {
|
||||||
|
tables: vec![TableEntry {
|
||||||
|
path: name.into(),
|
||||||
|
table: int_schema(arity),
|
||||||
|
}],
|
||||||
|
laws: Vec::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn empty_store_has_no_relations() {
|
||||||
|
let storage = GeomergeStorage::new();
|
||||||
|
assert!(matches!(
|
||||||
|
storage.arity("edge"),
|
||||||
|
Err(StorageError::RelationNotFound(_))
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn create_relation_on_undeclared_returns_validation_error() {
|
||||||
|
let mut storage = GeomergeStorage::new();
|
||||||
|
assert!(matches!(
|
||||||
|
storage.create_relation("edge", 2),
|
||||||
|
Err(StorageError::Validation(_))
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn theory_loaded_insert_scan_roundtrip() -> Result<(), StorageError> {
|
||||||
|
let theory = theory_with_one_int_table("edge", 2);
|
||||||
|
let mut storage = GeomergeStorage::from_theory(theory)?;
|
||||||
|
storage.create_relation("edge", 2)?;
|
||||||
|
storage.insert("edge", vec![i(1), i(2)])?;
|
||||||
|
storage.insert("edge", vec![i(3), i(4)])?;
|
||||||
|
let rows = storage.scan("edge")?;
|
||||||
|
assert_eq!(rows, vec![vec![i(1), i(2)], vec![i(3), i(4)]]);
|
||||||
|
assert_eq!(storage.arity("edge")?, 2);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn duplicate_create_returns_err() -> Result<(), StorageError> {
|
||||||
|
let theory = theory_with_one_int_table("edge", 2);
|
||||||
|
let mut storage = GeomergeStorage::from_theory(theory)?;
|
||||||
|
storage.create_relation("edge", 2)?;
|
||||||
|
assert!(matches!(
|
||||||
|
storage.create_relation("edge", 2),
|
||||||
|
Err(StorageError::RelationExists(_))
|
||||||
|
));
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn insert_wrong_type_returns_validation_error() -> Result<(), StorageError> {
|
||||||
|
let theory = theory_with_one_int_table("edge", 2);
|
||||||
|
let mut storage = GeomergeStorage::from_theory(theory)?;
|
||||||
|
storage.create_relation("edge", 2)?;
|
||||||
|
// Insert a Str into an Int column: geomerge rejects it.
|
||||||
|
let result = storage.insert("edge", vec![Value::Str("not an int".to_string()), i(2)]);
|
||||||
|
assert!(matches!(result, Err(StorageError::Validation(_))));
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
145
crates/query-storage/src/lib.rs
Normal file
145
crates/query-storage/src/lib.rs
Normal file
@ -0,0 +1,145 @@
|
|||||||
|
//! Storage layer for the query-plan playground.
|
||||||
|
//!
|
||||||
|
//! This is the foundational crate of the workspace. It owns the [`Value`] cell
|
||||||
|
//! type and the [`Table`] container, defines the [`Storage`] trait, and ships
|
||||||
|
//! adapters for several backends behind Cargo features. Higher-level crates
|
||||||
|
//! such as `query-ops` depend on this crate for both the types and the trait.
|
||||||
|
//!
|
||||||
|
//! The v1 trait surface is deliberately narrow: create a relation, scan all
|
||||||
|
//! rows, insert a row, ask for arity. Transactions, range scans, deletes, and
|
||||||
|
//! delta streams are not modeled yet, and will be added when a specific
|
||||||
|
//! experiment demands them.
|
||||||
|
//!
|
||||||
|
//! ## Backends
|
||||||
|
//!
|
||||||
|
//! [`MemoryStorage`] is always available. Other backends are gated behind
|
||||||
|
//! Cargo features so users only pay for what they need:
|
||||||
|
//!
|
||||||
|
//! - `lmdb` — LMDB via the `heed` crate
|
||||||
|
//! - `redb` — pure-Rust embedded KV
|
||||||
|
//! - `fjall` — pure-Rust LSM-tree
|
||||||
|
//! - `sled` — pure-Rust LSM-tree
|
||||||
|
//! - `geomerge` — the workspace's `geomerge` crate
|
||||||
|
|
||||||
|
use crate::table::Table;
|
||||||
|
use crate::value::Value;
|
||||||
|
|
||||||
|
pub mod codec;
|
||||||
|
pub mod memory;
|
||||||
|
pub mod table;
|
||||||
|
pub mod value;
|
||||||
|
|
||||||
|
#[cfg(feature = "sled")]
|
||||||
|
pub mod sled;
|
||||||
|
|
||||||
|
#[cfg(feature = "redb")]
|
||||||
|
pub mod redb;
|
||||||
|
|
||||||
|
#[cfg(feature = "fjall")]
|
||||||
|
pub mod fjall;
|
||||||
|
|
||||||
|
#[cfg(feature = "lmdb")]
|
||||||
|
pub mod lmdb;
|
||||||
|
|
||||||
|
#[cfg(feature = "geomerge")]
|
||||||
|
pub mod geomerge;
|
||||||
|
|
||||||
|
pub use memory::MemoryStorage;
|
||||||
|
|
||||||
|
/// Errors returned by a [`Storage`] backend.
|
||||||
|
///
|
||||||
|
/// Backend-specific failures (LMDB transaction aborts, sled I/O errors, etc.)
|
||||||
|
/// are wrapped in [`StorageError::Backend`].
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum StorageError {
|
||||||
|
/// No relation with the given name exists in this backend.
|
||||||
|
RelationNotFound(String),
|
||||||
|
/// A relation with the given name already exists.
|
||||||
|
RelationExists(String),
|
||||||
|
/// A row was offered with the wrong number of columns.
|
||||||
|
ArityMismatch { expected: usize, got: usize },
|
||||||
|
/// A backend-defined validation rule rejected the operation, for example
|
||||||
|
/// a `geomerge` law violation.
|
||||||
|
Validation(String),
|
||||||
|
/// A row decoded from storage was malformed.
|
||||||
|
Decode(codec::CodecError),
|
||||||
|
/// A backend-specific error wrapped for transport across the trait.
|
||||||
|
Backend(Box<dyn std::error::Error + Send + Sync>),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::fmt::Display for StorageError {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
match self {
|
||||||
|
Self::RelationNotFound(name) => write!(f, "relation not found: {name}"),
|
||||||
|
Self::RelationExists(name) => write!(f, "relation already exists: {name}"),
|
||||||
|
Self::ArityMismatch { expected, got } => {
|
||||||
|
write!(f, "arity mismatch: expected {expected}, got {got}")
|
||||||
|
}
|
||||||
|
Self::Validation(msg) => write!(f, "validation failed: {msg}"),
|
||||||
|
Self::Decode(err) => write!(f, "decode error: {err}"),
|
||||||
|
Self::Backend(err) => write!(f, "backend error: {err}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::error::Error for StorageError {
|
||||||
|
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
|
||||||
|
match self {
|
||||||
|
Self::Backend(err) => Some(err.as_ref()),
|
||||||
|
Self::Decode(err) => Some(err),
|
||||||
|
_ => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<codec::CodecError> for StorageError {
|
||||||
|
fn from(err: codec::CodecError) -> Self {
|
||||||
|
Self::Decode(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Backend-agnostic interface for storing and retrieving rows.
|
||||||
|
///
|
||||||
|
/// Each relation has a fixed name, a fixed arity (row width), and an ordered
|
||||||
|
/// collection of rows whose cells are [`Value`]s. Concrete implementations
|
||||||
|
/// include [`MemoryStorage`] in this crate plus the feature-gated backends.
|
||||||
|
pub trait Storage {
|
||||||
|
/// Create a new relation with the given name and arity.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
/// Returns [`StorageError::RelationExists`] if a relation with the given
|
||||||
|
/// name already exists.
|
||||||
|
fn create_relation(&mut self, name: &str, arity: usize) -> Result<(), StorageError>;
|
||||||
|
|
||||||
|
/// Return the arity of the given relation.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
/// Returns [`StorageError::RelationNotFound`] if no such relation exists.
|
||||||
|
fn arity(&self, name: &str) -> Result<usize, StorageError>;
|
||||||
|
|
||||||
|
/// Scan all rows of the given relation in storage order.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
/// Returns [`StorageError::RelationNotFound`] if no such relation exists.
|
||||||
|
fn scan(&self, name: &str) -> Result<Vec<Vec<Value>>, StorageError>;
|
||||||
|
|
||||||
|
/// Append a row to the given relation.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
/// Returns [`StorageError::RelationNotFound`] if no such relation exists,
|
||||||
|
/// [`StorageError::ArityMismatch`] if the row's length differs from the
|
||||||
|
/// declared arity, or [`StorageError::Validation`] / [`StorageError::Backend`]
|
||||||
|
/// if a backend-specific rule rejects the row.
|
||||||
|
fn insert(&mut self, name: &str, row: Vec<Value>) -> Result<(), StorageError>;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Materialize a relation from a [`Storage`] backend as a [`Table`] that
|
||||||
|
/// query-language operators can consume.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
/// Returns any error produced by [`Storage::arity`] or [`Storage::scan`].
|
||||||
|
pub fn scan_as_table(storage: &dyn Storage, name: &str) -> Result<Table, StorageError> {
|
||||||
|
let arity = storage.arity(name)?;
|
||||||
|
let rows = storage.scan(name)?;
|
||||||
|
Ok(Table::from_rows(arity, rows))
|
||||||
|
}
|
||||||
201
crates/query-storage/src/lmdb.rs
Normal file
201
crates/query-storage/src/lmdb.rs
Normal file
@ -0,0 +1,201 @@
|
|||||||
|
//! LMDB adapter via the `heed` crate.
|
||||||
|
//!
|
||||||
|
//! Maps each relation onto a named LMDB sub-database of the same name. A
|
||||||
|
//! reserved sub-database named `__meta` carries per-relation metadata (arity
|
||||||
|
//! and next synthetic row ID).
|
||||||
|
//!
|
||||||
|
//! Note: every [`Storage::insert`] opens its own write transaction. LMDB
|
||||||
|
//! serializes writers across the env, so per-row inserts will be slow on real
|
||||||
|
//! workloads. The v1 trait does not yet expose batch inserts.
|
||||||
|
|
||||||
|
use heed::types::Bytes;
|
||||||
|
use heed::{Database, Env, EnvOpenOptions};
|
||||||
|
|
||||||
|
use crate::value::Value;
|
||||||
|
|
||||||
|
use crate::codec::{decode_meta, decode_row, encode_meta, encode_row, row_key};
|
||||||
|
use crate::{Storage, StorageError};
|
||||||
|
|
||||||
|
const META_DB: &str = "__meta";
|
||||||
|
const DEFAULT_MAX_DBS: u32 = 128;
|
||||||
|
const DEFAULT_MAP_SIZE: usize = 100 * 1024 * 1024;
|
||||||
|
|
||||||
|
fn backend<E: std::error::Error + Send + Sync + 'static>(err: E) -> StorageError {
|
||||||
|
StorageError::Backend(Box::new(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// LMDB-backed [`Storage`] implementation.
|
||||||
|
pub struct LmdbStorage {
|
||||||
|
env: Env,
|
||||||
|
meta: Database<Bytes, Bytes>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl LmdbStorage {
|
||||||
|
/// Open or create an LMDB environment at `path`.
|
||||||
|
///
|
||||||
|
/// The path must already exist as a directory; LMDB will create its data
|
||||||
|
/// files inside it.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
/// Returns [`StorageError::Backend`] if LMDB fails to open.
|
||||||
|
///
|
||||||
|
/// # Safety
|
||||||
|
/// Internally uses `EnvOpenOptions::open`, which heed marks `unsafe`
|
||||||
|
/// because the memory-mapped file's contents can be modified by other
|
||||||
|
/// processes. The adapter assumes single-process exclusive access.
|
||||||
|
#[allow(unsafe_code)]
|
||||||
|
pub fn open(path: impl AsRef<std::path::Path>) -> Result<Self, StorageError> {
|
||||||
|
// SAFETY: heed marks `open` unsafe because the mmap'd file's contents
|
||||||
|
// can be modified by other processes, violating Rust's aliasing rules.
|
||||||
|
// This adapter assumes single-process exclusive access to the path,
|
||||||
|
// which holds for tests and typical playground use.
|
||||||
|
let env = unsafe {
|
||||||
|
EnvOpenOptions::new()
|
||||||
|
.max_dbs(DEFAULT_MAX_DBS)
|
||||||
|
.map_size(DEFAULT_MAP_SIZE)
|
||||||
|
.open(path)
|
||||||
|
.map_err(backend)?
|
||||||
|
};
|
||||||
|
let mut wtxn = env.write_txn().map_err(backend)?;
|
||||||
|
let meta: Database<Bytes, Bytes> = env
|
||||||
|
.create_database(&mut wtxn, Some(META_DB))
|
||||||
|
.map_err(backend)?;
|
||||||
|
wtxn.commit().map_err(backend)?;
|
||||||
|
Ok(Self { env, meta })
|
||||||
|
}
|
||||||
|
|
||||||
|
fn open_relation_db(
|
||||||
|
&self,
|
||||||
|
wtxn: &mut heed::RwTxn,
|
||||||
|
name: &str,
|
||||||
|
) -> Result<Database<Bytes, Bytes>, StorageError> {
|
||||||
|
self.env.create_database(wtxn, Some(name)).map_err(backend)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Storage for LmdbStorage {
|
||||||
|
fn create_relation(&mut self, name: &str, arity: usize) -> Result<(), StorageError> {
|
||||||
|
if name == META_DB {
|
||||||
|
return Err(StorageError::Validation(format!(
|
||||||
|
"relation name '{name}' is reserved"
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
let arity_u32 = u32::try_from(arity)
|
||||||
|
.map_err(|_| StorageError::Validation(format!("arity {arity} exceeds u32 range")))?;
|
||||||
|
let mut wtxn = self.env.write_txn().map_err(backend)?;
|
||||||
|
if self
|
||||||
|
.meta
|
||||||
|
.get(&wtxn, name.as_bytes())
|
||||||
|
.map_err(backend)?
|
||||||
|
.is_some()
|
||||||
|
{
|
||||||
|
return Err(StorageError::RelationExists(name.to_string()));
|
||||||
|
}
|
||||||
|
let encoded = encode_meta(arity_u32, 0);
|
||||||
|
self.meta
|
||||||
|
.put(&mut wtxn, name.as_bytes(), &encoded[..])
|
||||||
|
.map_err(backend)?;
|
||||||
|
let _ = self.open_relation_db(&mut wtxn, name)?;
|
||||||
|
wtxn.commit().map_err(backend)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn arity(&self, name: &str) -> Result<usize, StorageError> {
|
||||||
|
let rtxn = self.env.read_txn().map_err(backend)?;
|
||||||
|
let raw = self
|
||||||
|
.meta
|
||||||
|
.get(&rtxn, name.as_bytes())
|
||||||
|
.map_err(backend)?
|
||||||
|
.ok_or_else(|| StorageError::RelationNotFound(name.to_string()))?;
|
||||||
|
let (arity, _) = decode_meta(raw)?;
|
||||||
|
Ok(arity as usize)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn scan(&self, name: &str) -> Result<Vec<Vec<Value>>, StorageError> {
|
||||||
|
let rtxn = self.env.read_txn().map_err(backend)?;
|
||||||
|
if self
|
||||||
|
.meta
|
||||||
|
.get(&rtxn, name.as_bytes())
|
||||||
|
.map_err(backend)?
|
||||||
|
.is_none()
|
||||||
|
{
|
||||||
|
return Err(StorageError::RelationNotFound(name.to_string()));
|
||||||
|
}
|
||||||
|
let db: Database<Bytes, Bytes> = self
|
||||||
|
.env
|
||||||
|
.open_database(&rtxn, Some(name))
|
||||||
|
.map_err(backend)?
|
||||||
|
.ok_or_else(|| StorageError::RelationNotFound(name.to_string()))?;
|
||||||
|
let mut rows = Vec::new();
|
||||||
|
for entry in db.iter(&rtxn).map_err(backend)? {
|
||||||
|
let (_, value) = entry.map_err(backend)?;
|
||||||
|
rows.push(decode_row(value)?);
|
||||||
|
}
|
||||||
|
Ok(rows)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn insert(&mut self, name: &str, row: Vec<Value>) -> Result<(), StorageError> {
|
||||||
|
let mut wtxn = self.env.write_txn().map_err(backend)?;
|
||||||
|
let meta_bytes = self
|
||||||
|
.meta
|
||||||
|
.get(&wtxn, name.as_bytes())
|
||||||
|
.map_err(backend)?
|
||||||
|
.ok_or_else(|| StorageError::RelationNotFound(name.to_string()))?;
|
||||||
|
let (arity, next_id) = decode_meta(meta_bytes)?;
|
||||||
|
if row.len() != arity as usize {
|
||||||
|
return Err(StorageError::ArityMismatch {
|
||||||
|
expected: arity as usize,
|
||||||
|
got: row.len(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
let db = self.open_relation_db(&mut wtxn, name)?;
|
||||||
|
let key = row_key(next_id);
|
||||||
|
let value = encode_row(&row);
|
||||||
|
db.put(&mut wtxn, &key[..], &value[..]).map_err(backend)?;
|
||||||
|
let new_meta = encode_meta(arity, next_id + 1);
|
||||||
|
self.meta
|
||||||
|
.put(&mut wtxn, name.as_bytes(), &new_meta[..])
|
||||||
|
.map_err(backend)?;
|
||||||
|
wtxn.commit().map_err(backend)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
fn i(x: i64) -> Value {
|
||||||
|
Value::Int(x)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn open_temp() -> Result<LmdbStorage, StorageError> {
|
||||||
|
let dir = tempfile::tempdir().map_err(backend)?;
|
||||||
|
let storage = LmdbStorage::open(dir.path())?;
|
||||||
|
std::mem::forget(dir);
|
||||||
|
Ok(storage)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn create_insert_scan_roundtrip() -> Result<(), StorageError> {
|
||||||
|
let mut storage = open_temp()?;
|
||||||
|
storage.create_relation("edge", 2)?;
|
||||||
|
storage.insert("edge", vec![i(1), i(2)])?;
|
||||||
|
storage.insert("edge", vec![i(2), i(3)])?;
|
||||||
|
let rows = storage.scan("edge")?;
|
||||||
|
assert_eq!(rows, vec![vec![i(1), i(2)], vec![i(2), i(3)]]);
|
||||||
|
assert_eq!(storage.arity("edge")?, 2);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn duplicate_create_returns_err() -> Result<(), StorageError> {
|
||||||
|
let mut storage = open_temp()?;
|
||||||
|
storage.create_relation("edge", 2)?;
|
||||||
|
assert!(matches!(
|
||||||
|
storage.create_relation("edge", 2),
|
||||||
|
Err(StorageError::RelationExists(_))
|
||||||
|
));
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
147
crates/query-storage/src/memory.rs
Normal file
147
crates/query-storage/src/memory.rs
Normal file
@ -0,0 +1,147 @@
|
|||||||
|
//! In-memory backend, keyed by relation name. Always available.
|
||||||
|
|
||||||
|
use std::collections::HashMap;
|
||||||
|
|
||||||
|
use crate::value::Value;
|
||||||
|
|
||||||
|
use crate::{Storage, StorageError};
|
||||||
|
|
||||||
|
/// In-memory backend, useful as the default in tests and as a correctness
|
||||||
|
/// oracle for other backends.
|
||||||
|
#[derive(Debug, Default)]
|
||||||
|
pub struct MemoryStorage {
|
||||||
|
relations: HashMap<String, MemoryRelation>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct MemoryRelation {
|
||||||
|
arity: usize,
|
||||||
|
rows: Vec<Vec<Value>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MemoryStorage {
|
||||||
|
#[must_use]
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self::default()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Storage for MemoryStorage {
|
||||||
|
fn create_relation(&mut self, name: &str, arity: usize) -> Result<(), StorageError> {
|
||||||
|
if self.relations.contains_key(name) {
|
||||||
|
return Err(StorageError::RelationExists(name.to_string()));
|
||||||
|
}
|
||||||
|
self.relations.insert(
|
||||||
|
name.to_string(),
|
||||||
|
MemoryRelation {
|
||||||
|
arity,
|
||||||
|
rows: Vec::new(),
|
||||||
|
},
|
||||||
|
);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn arity(&self, name: &str) -> Result<usize, StorageError> {
|
||||||
|
self.relations
|
||||||
|
.get(name)
|
||||||
|
.map(|r| r.arity)
|
||||||
|
.ok_or_else(|| StorageError::RelationNotFound(name.to_string()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn scan(&self, name: &str) -> Result<Vec<Vec<Value>>, StorageError> {
|
||||||
|
self.relations
|
||||||
|
.get(name)
|
||||||
|
.map(|r| r.rows.clone())
|
||||||
|
.ok_or_else(|| StorageError::RelationNotFound(name.to_string()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn insert(&mut self, name: &str, row: Vec<Value>) -> Result<(), StorageError> {
|
||||||
|
let relation = self
|
||||||
|
.relations
|
||||||
|
.get_mut(name)
|
||||||
|
.ok_or_else(|| StorageError::RelationNotFound(name.to_string()))?;
|
||||||
|
if row.len() != relation.arity {
|
||||||
|
return Err(StorageError::ArityMismatch {
|
||||||
|
expected: relation.arity,
|
||||||
|
got: row.len(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
relation.rows.push(row);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use crate::scan_as_table;
|
||||||
|
|
||||||
|
fn i(x: i64) -> Value {
|
||||||
|
Value::Int(x)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn create_insert_scan_roundtrip() -> Result<(), StorageError> {
|
||||||
|
let mut storage = MemoryStorage::new();
|
||||||
|
storage.create_relation("edge", 2)?;
|
||||||
|
storage.insert("edge", vec![i(1), i(2)])?;
|
||||||
|
storage.insert("edge", vec![i(2), i(3)])?;
|
||||||
|
let rows = storage.scan("edge")?;
|
||||||
|
assert_eq!(rows, vec![vec![i(1), i(2)], vec![i(2), i(3)]]);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn duplicate_create_returns_err() -> Result<(), StorageError> {
|
||||||
|
let mut storage = MemoryStorage::new();
|
||||||
|
storage.create_relation("edge", 2)?;
|
||||||
|
assert!(matches!(
|
||||||
|
storage.create_relation("edge", 2),
|
||||||
|
Err(StorageError::RelationExists(_))
|
||||||
|
));
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn scan_unknown_relation_returns_err() {
|
||||||
|
let storage = MemoryStorage::new();
|
||||||
|
assert!(matches!(
|
||||||
|
storage.scan("missing"),
|
||||||
|
Err(StorageError::RelationNotFound(_))
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn arity_unknown_relation_returns_err() {
|
||||||
|
let storage = MemoryStorage::new();
|
||||||
|
assert!(matches!(
|
||||||
|
storage.arity("missing"),
|
||||||
|
Err(StorageError::RelationNotFound(_))
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn insert_wrong_arity_returns_err() -> Result<(), StorageError> {
|
||||||
|
let mut storage = MemoryStorage::new();
|
||||||
|
storage.create_relation("edge", 2)?;
|
||||||
|
assert!(matches!(
|
||||||
|
storage.insert("edge", vec![i(1)]),
|
||||||
|
Err(StorageError::ArityMismatch {
|
||||||
|
expected: 2,
|
||||||
|
got: 1
|
||||||
|
})
|
||||||
|
));
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn scan_as_table_materializes_table() -> Result<(), StorageError> {
|
||||||
|
let mut storage = MemoryStorage::new();
|
||||||
|
storage.create_relation("edge", 2)?;
|
||||||
|
storage.insert("edge", vec![i(1), i(2)])?;
|
||||||
|
let table = scan_as_table(&storage, "edge")?;
|
||||||
|
assert_eq!(table.arity, 2);
|
||||||
|
assert_eq!(table.rows, vec![vec![i(1), i(2)]]);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
183
crates/query-storage/src/redb.rs
Normal file
183
crates/query-storage/src/redb.rs
Normal file
@ -0,0 +1,183 @@
|
|||||||
|
//! redb adapter.
|
||||||
|
//!
|
||||||
|
//! Each relation gets a redb table named after it, keyed by `u64` row IDs.
|
||||||
|
//! A reserved table named `__meta`, keyed by relation name, carries per-relation
|
||||||
|
//! metadata (arity and next synthetic row ID).
|
||||||
|
|
||||||
|
use redb::{Database, ReadableTable, TableDefinition};
|
||||||
|
|
||||||
|
use crate::value::Value;
|
||||||
|
|
||||||
|
use crate::codec::{decode_meta, decode_row, encode_meta, encode_row};
|
||||||
|
use crate::{Storage, StorageError};
|
||||||
|
|
||||||
|
const META_TABLE: &str = "__meta";
|
||||||
|
|
||||||
|
fn backend<E: std::error::Error + Send + Sync + 'static>(err: E) -> StorageError {
|
||||||
|
StorageError::Backend(Box::new(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn meta_def() -> TableDefinition<'static, &'static str, &'static [u8]> {
|
||||||
|
TableDefinition::new(META_TABLE)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn rows_def(name: &str) -> TableDefinition<'_, u64, &'static [u8]> {
|
||||||
|
TableDefinition::new(name)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// redb-backed [`Storage`] implementation.
|
||||||
|
pub struct RedbStorage {
|
||||||
|
db: Database,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RedbStorage {
|
||||||
|
/// Open or create a redb database at `path`.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
/// Returns [`StorageError::Backend`] if redb fails to open the file.
|
||||||
|
pub fn open(path: impl AsRef<std::path::Path>) -> Result<Self, StorageError> {
|
||||||
|
let db = Database::create(path).map_err(backend)?;
|
||||||
|
Ok(Self { db })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Storage for RedbStorage {
|
||||||
|
fn create_relation(&mut self, name: &str, arity: usize) -> Result<(), StorageError> {
|
||||||
|
if name == META_TABLE {
|
||||||
|
return Err(StorageError::Validation(format!(
|
||||||
|
"relation name '{name}' is reserved"
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
let arity_u32 = u32::try_from(arity)
|
||||||
|
.map_err(|_| StorageError::Validation(format!("arity {arity} exceeds u32 range")))?;
|
||||||
|
let txn = self.db.begin_write().map_err(backend)?;
|
||||||
|
{
|
||||||
|
let mut meta = txn.open_table(meta_def()).map_err(backend)?;
|
||||||
|
if meta.get(name).map_err(backend)?.is_some() {
|
||||||
|
return Err(StorageError::RelationExists(name.to_string()));
|
||||||
|
}
|
||||||
|
let encoded = encode_meta(arity_u32, 0);
|
||||||
|
meta.insert(name, &encoded[..]).map_err(backend)?;
|
||||||
|
// open_table creates the rows table if it does not exist
|
||||||
|
let _ = txn.open_table(rows_def(name)).map_err(backend)?;
|
||||||
|
}
|
||||||
|
txn.commit().map_err(backend)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn arity(&self, name: &str) -> Result<usize, StorageError> {
|
||||||
|
let txn = self.db.begin_read().map_err(backend)?;
|
||||||
|
let meta = txn.open_table(meta_def()).map_err(backend)?;
|
||||||
|
let raw = meta
|
||||||
|
.get(name)
|
||||||
|
.map_err(backend)?
|
||||||
|
.ok_or_else(|| StorageError::RelationNotFound(name.to_string()))?;
|
||||||
|
let (arity, _) = decode_meta(raw.value())?;
|
||||||
|
Ok(arity as usize)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn scan(&self, name: &str) -> Result<Vec<Vec<Value>>, StorageError> {
|
||||||
|
let txn = self.db.begin_read().map_err(backend)?;
|
||||||
|
let meta = txn.open_table(meta_def()).map_err(backend)?;
|
||||||
|
if meta.get(name).map_err(backend)?.is_none() {
|
||||||
|
return Err(StorageError::RelationNotFound(name.to_string()));
|
||||||
|
}
|
||||||
|
let table = txn.open_table(rows_def(name)).map_err(backend)?;
|
||||||
|
let mut rows = Vec::new();
|
||||||
|
for entry in table.iter().map_err(backend)? {
|
||||||
|
let (_, value) = entry.map_err(backend)?;
|
||||||
|
rows.push(decode_row(value.value())?);
|
||||||
|
}
|
||||||
|
Ok(rows)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn insert(&mut self, name: &str, row: Vec<Value>) -> Result<(), StorageError> {
|
||||||
|
let txn = self.db.begin_write().map_err(backend)?;
|
||||||
|
let (arity, next_id) = {
|
||||||
|
let meta = txn.open_table(meta_def()).map_err(backend)?;
|
||||||
|
let entry = meta
|
||||||
|
.get(name)
|
||||||
|
.map_err(backend)?
|
||||||
|
.ok_or_else(|| StorageError::RelationNotFound(name.to_string()))?;
|
||||||
|
decode_meta(entry.value())?
|
||||||
|
};
|
||||||
|
if row.len() != arity as usize {
|
||||||
|
return Err(StorageError::ArityMismatch {
|
||||||
|
expected: arity as usize,
|
||||||
|
got: row.len(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
{
|
||||||
|
let mut rows = txn.open_table(rows_def(name)).map_err(backend)?;
|
||||||
|
let encoded = encode_row(&row);
|
||||||
|
rows.insert(next_id, &encoded[..]).map_err(backend)?;
|
||||||
|
}
|
||||||
|
{
|
||||||
|
let mut meta = txn.open_table(meta_def()).map_err(backend)?;
|
||||||
|
let new_meta = encode_meta(arity, next_id + 1);
|
||||||
|
meta.insert(name, new_meta.as_ref()).map_err(backend)?;
|
||||||
|
}
|
||||||
|
txn.commit().map_err(backend)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
fn i(x: i64) -> Value {
|
||||||
|
Value::Int(x)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn s(x: &str) -> Value {
|
||||||
|
Value::Str(x.to_string())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn open_temp() -> Result<RedbStorage, StorageError> {
|
||||||
|
let dir = tempfile::tempdir().map_err(backend)?;
|
||||||
|
// The file does not have to exist for redb::create.
|
||||||
|
let path = dir.path().join("test.redb");
|
||||||
|
let storage = RedbStorage::open(&path)?;
|
||||||
|
// Keep the tempdir alive by leaking it (test-only).
|
||||||
|
std::mem::forget(dir);
|
||||||
|
Ok(storage)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn create_insert_scan_roundtrip() -> Result<(), StorageError> {
|
||||||
|
let mut storage = open_temp()?;
|
||||||
|
storage.create_relation("edge", 2)?;
|
||||||
|
storage.insert("edge", vec![i(1), i(2)])?;
|
||||||
|
storage.insert("edge", vec![s("hello"), i(7)])?;
|
||||||
|
let rows = storage.scan("edge")?;
|
||||||
|
assert_eq!(rows, vec![vec![i(1), i(2)], vec![s("hello"), i(7)]]);
|
||||||
|
assert_eq!(storage.arity("edge")?, 2);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn duplicate_create_returns_err() -> Result<(), StorageError> {
|
||||||
|
let mut storage = open_temp()?;
|
||||||
|
storage.create_relation("edge", 2)?;
|
||||||
|
assert!(matches!(
|
||||||
|
storage.create_relation("edge", 2),
|
||||||
|
Err(StorageError::RelationExists(_))
|
||||||
|
));
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn insert_wrong_arity_returns_err() -> Result<(), StorageError> {
|
||||||
|
let mut storage = open_temp()?;
|
||||||
|
storage.create_relation("edge", 2)?;
|
||||||
|
assert!(matches!(
|
||||||
|
storage.insert("edge", vec![i(1)]),
|
||||||
|
Err(StorageError::ArityMismatch {
|
||||||
|
expected: 2,
|
||||||
|
got: 1,
|
||||||
|
})
|
||||||
|
));
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
161
crates/query-storage/src/sled.rs
Normal file
161
crates/query-storage/src/sled.rs
Normal file
@ -0,0 +1,161 @@
|
|||||||
|
//! Sled adapter.
|
||||||
|
//!
|
||||||
|
//! Maps each relation onto a sled [`Tree`](sled::Tree) of the same name. A
|
||||||
|
//! reserved tree named `__meta` carries per-relation metadata (arity and the
|
||||||
|
//! next synthetic row ID).
|
||||||
|
|
||||||
|
use crate::value::Value;
|
||||||
|
|
||||||
|
use crate::codec::{decode_meta, decode_row, encode_meta, encode_row, row_key};
|
||||||
|
use crate::{Storage, StorageError};
|
||||||
|
|
||||||
|
const META_TREE: &str = "__meta";
|
||||||
|
|
||||||
|
fn backend<E: std::error::Error + Send + Sync + 'static>(err: E) -> StorageError {
|
||||||
|
StorageError::Backend(Box::new(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Sled-backed [`Storage`] implementation.
|
||||||
|
pub struct SledStorage {
|
||||||
|
db: sled::Db,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SledStorage {
|
||||||
|
/// Open or create a sled database at `path`.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
/// Returns [`StorageError::Backend`] if sled fails to open the path.
|
||||||
|
pub fn open(path: impl AsRef<std::path::Path>) -> Result<Self, StorageError> {
|
||||||
|
let db = sled::open(path).map_err(backend)?;
|
||||||
|
Ok(Self { db })
|
||||||
|
}
|
||||||
|
|
||||||
|
fn meta_tree(&self) -> Result<sled::Tree, StorageError> {
|
||||||
|
self.db.open_tree(META_TREE).map_err(backend)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn relation_tree(&self, name: &str) -> Result<sled::Tree, StorageError> {
|
||||||
|
self.db.open_tree(name).map_err(backend)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn load_meta(&self, name: &str) -> Result<(u32, u64), StorageError> {
|
||||||
|
let meta = self.meta_tree()?;
|
||||||
|
let raw = meta
|
||||||
|
.get(name.as_bytes())
|
||||||
|
.map_err(backend)?
|
||||||
|
.ok_or_else(|| StorageError::RelationNotFound(name.to_string()))?;
|
||||||
|
Ok(decode_meta(raw.as_ref())?)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn store_meta(&self, name: &str, arity: u32, next_id: u64) -> Result<(), StorageError> {
|
||||||
|
let meta = self.meta_tree()?;
|
||||||
|
let encoded = encode_meta(arity, next_id);
|
||||||
|
meta.insert(name.as_bytes(), encoded.as_ref())
|
||||||
|
.map_err(backend)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Storage for SledStorage {
|
||||||
|
fn create_relation(&mut self, name: &str, arity: usize) -> Result<(), StorageError> {
|
||||||
|
if name == META_TREE {
|
||||||
|
return Err(StorageError::Validation(format!(
|
||||||
|
"relation name '{name}' is reserved"
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
let meta = self.meta_tree()?;
|
||||||
|
if meta.contains_key(name.as_bytes()).map_err(backend)? {
|
||||||
|
return Err(StorageError::RelationExists(name.to_string()));
|
||||||
|
}
|
||||||
|
let arity_u32 = u32::try_from(arity)
|
||||||
|
.map_err(|_| StorageError::Validation(format!("arity {arity} exceeds u32 range")))?;
|
||||||
|
self.store_meta(name, arity_u32, 0)?;
|
||||||
|
// open_tree creates the tree if it doesn't exist
|
||||||
|
let _ = self.relation_tree(name)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn arity(&self, name: &str) -> Result<usize, StorageError> {
|
||||||
|
let (arity, _) = self.load_meta(name)?;
|
||||||
|
Ok(arity as usize)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn scan(&self, name: &str) -> Result<Vec<Vec<Value>>, StorageError> {
|
||||||
|
let _ = self.load_meta(name)?;
|
||||||
|
let tree = self.relation_tree(name)?;
|
||||||
|
let mut rows = Vec::new();
|
||||||
|
for entry in &tree {
|
||||||
|
let (_, value) = entry.map_err(backend)?;
|
||||||
|
rows.push(decode_row(value.as_ref())?);
|
||||||
|
}
|
||||||
|
Ok(rows)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn insert(&mut self, name: &str, row: Vec<Value>) -> Result<(), StorageError> {
|
||||||
|
let (arity, next_id) = self.load_meta(name)?;
|
||||||
|
if row.len() != arity as usize {
|
||||||
|
return Err(StorageError::ArityMismatch {
|
||||||
|
expected: arity as usize,
|
||||||
|
got: row.len(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
let tree = self.relation_tree(name)?;
|
||||||
|
tree.insert(row_key(next_id), encode_row(&row))
|
||||||
|
.map_err(backend)?;
|
||||||
|
self.store_meta(name, arity, next_id + 1)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
fn i(x: i64) -> Value {
|
||||||
|
Value::Int(x)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn create_insert_scan_roundtrip() -> Result<(), StorageError> {
|
||||||
|
let dir = tempfile::tempdir().map_err(backend)?;
|
||||||
|
let mut storage = SledStorage::open(dir.path())?;
|
||||||
|
storage.create_relation("edge", 2)?;
|
||||||
|
storage.insert("edge", vec![i(1), i(2)])?;
|
||||||
|
storage.insert("edge", vec![i(2), i(3)])?;
|
||||||
|
storage.insert("edge", vec![i(3), i(3)])?;
|
||||||
|
let rows = storage.scan("edge")?;
|
||||||
|
assert_eq!(
|
||||||
|
rows,
|
||||||
|
vec![vec![i(1), i(2)], vec![i(2), i(3)], vec![i(3), i(3)],],
|
||||||
|
);
|
||||||
|
assert_eq!(storage.arity("edge")?, 2);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn duplicate_create_returns_err() -> Result<(), StorageError> {
|
||||||
|
let dir = tempfile::tempdir().map_err(backend)?;
|
||||||
|
let mut storage = SledStorage::open(dir.path())?;
|
||||||
|
storage.create_relation("edge", 2)?;
|
||||||
|
assert!(matches!(
|
||||||
|
storage.create_relation("edge", 2),
|
||||||
|
Err(StorageError::RelationExists(_))
|
||||||
|
));
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn insert_wrong_arity_returns_err() -> Result<(), StorageError> {
|
||||||
|
let dir = tempfile::tempdir().map_err(backend)?;
|
||||||
|
let mut storage = SledStorage::open(dir.path())?;
|
||||||
|
storage.create_relation("edge", 2)?;
|
||||||
|
assert!(matches!(
|
||||||
|
storage.insert("edge", vec![i(1)]),
|
||||||
|
Err(StorageError::ArityMismatch {
|
||||||
|
expected: 2,
|
||||||
|
got: 1,
|
||||||
|
})
|
||||||
|
));
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
x
Reference in New Issue
Block a user