Compare commits
No commits in common. "b5721611422efb3748ed6af9624fd40bb97c66c2" and "1848af6d326fc450f620bac46da453ac786ceb06" have entirely different histories.
b572161142
...
1848af6d32
1097
Cargo.lock
generated
1097
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@ -9,4 +9,3 @@ rust-version.workspace = true
|
|||||||
workspace = true
|
workspace = true
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
query-storage = { path = "../query-storage" }
|
|
||||||
|
|||||||
@ -9,10 +9,7 @@
|
|||||||
|
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
|
||||||
use query_storage::table::Table;
|
use crate::{relation::Relation, table::Table, value::Value};
|
||||||
use query_storage::value::Value;
|
|
||||||
|
|
||||||
use crate::relation::Relation;
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
pub enum Term {
|
pub enum Term {
|
||||||
|
|||||||
@ -11,9 +11,7 @@
|
|||||||
|
|
||||||
use std::collections::{HashMap, HashSet};
|
use std::collections::{HashMap, HashSet};
|
||||||
|
|
||||||
use query_storage::value::Value;
|
use crate::{relation::Relation, value::Value};
|
||||||
|
|
||||||
use crate::relation::Relation;
|
|
||||||
|
|
||||||
fn shared_columns(left: &Relation, right: &Relation) -> Vec<(usize, usize)> {
|
fn shared_columns(left: &Relation, right: &Relation) -> Vec<(usize, usize)> {
|
||||||
left.columns
|
left.columns
|
||||||
|
|||||||
@ -2,8 +2,8 @@
|
|||||||
//!
|
//!
|
||||||
//! Three operators are in scope:
|
//! Three operators are in scope:
|
||||||
//!
|
//!
|
||||||
//! - [`atom::scan_atom`] scans a [`Table`](query_storage::table::Table) under
|
//! - [`atom::scan_atom`] scans a [`table::Table`] under an
|
||||||
//! an [`atom::AtomPattern`], filtering for repeated-variable equality and
|
//! [`atom::AtomPattern`], filtering for repeated-variable equality and
|
||||||
//! literal equality, and outputs a binding [`relation::Relation`].
|
//! literal equality, and outputs a binding [`relation::Relation`].
|
||||||
//! - [`join::semijoin`] keeps rows of one relation whose shared-column values
|
//! - [`join::semijoin`] keeps rows of one relation whose shared-column values
|
||||||
//! appear in another.
|
//! appear in another.
|
||||||
@ -14,11 +14,10 @@
|
|||||||
//! is just an expression like
|
//! is just an expression like
|
||||||
//! `natural_join(&semijoin(&a, &b), &scan_atom(&t, &p))`.
|
//! `natural_join(&semijoin(&a, &b), &scan_atom(&t, &p))`.
|
||||||
//!
|
//!
|
||||||
//! Foundational types [`Value`](query_storage::value::Value) and
|
//! Integration with an external query-plan IR is out of scope.
|
||||||
//! [`Table`](query_storage::table::Table) live in `query-storage`, the
|
|
||||||
//! storage-layer crate this crate is built on; storage backends produce
|
|
||||||
//! `Table`s that operators here consume.
|
|
||||||
|
|
||||||
pub mod atom;
|
pub mod atom;
|
||||||
pub mod join;
|
pub mod join;
|
||||||
pub mod relation;
|
pub mod relation;
|
||||||
|
pub mod table;
|
||||||
|
pub mod value;
|
||||||
|
|||||||
@ -10,7 +10,7 @@
|
|||||||
|
|
||||||
use std::collections::HashSet;
|
use std::collections::HashSet;
|
||||||
|
|
||||||
use query_storage::value::Value;
|
use crate::value::Value;
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct Relation {
|
pub struct Relation {
|
||||||
|
|||||||
@ -15,8 +15,8 @@
|
|||||||
|
|
||||||
use query_ops::atom::{AtomPattern, Term, scan_atom};
|
use query_ops::atom::{AtomPattern, Term, scan_atom};
|
||||||
use query_ops::join::{natural_join, semijoin};
|
use query_ops::join::{natural_join, semijoin};
|
||||||
use query_storage::table::Table;
|
use query_ops::table::Table;
|
||||||
use query_storage::value::Value;
|
use query_ops::value::Value;
|
||||||
|
|
||||||
fn s(x: &str) -> Value {
|
fn s(x: &str) -> Value {
|
||||||
Value::Str(x.to_string())
|
Value::Str(x.to_string())
|
||||||
|
|||||||
@ -1,40 +0,0 @@
|
|||||||
//! End-to-end: load rows into [`MemoryStorage`], scan as a [`Table`],
|
|
||||||
//! run [`scan_atom`] against it.
|
|
||||||
//!
|
|
||||||
//! Demonstrates that `query-ops` operators can consume from a storage backend
|
|
||||||
//! through the [`scan_as_table`] bridge, with no changes to `query-ops` itself.
|
|
||||||
|
|
||||||
use query_ops::atom::{AtomPattern, Term, scan_atom};
|
|
||||||
use query_storage::table::Table;
|
|
||||||
use query_storage::value::Value;
|
|
||||||
use query_storage::{MemoryStorage, Storage, StorageError, scan_as_table};
|
|
||||||
|
|
||||||
fn i(x: i64) -> Value {
|
|
||||||
Value::Int(x)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn var(name: &str) -> Term {
|
|
||||||
Term::Var(name.to_string())
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn scan_atom_consumes_from_memory_backend() -> Result<(), StorageError> {
|
|
||||||
let mut storage = MemoryStorage::new();
|
|
||||||
storage.create_relation("edge", 2)?;
|
|
||||||
storage.insert("edge", vec![i(1), i(2)])?;
|
|
||||||
storage.insert("edge", vec![i(2), i(2)])?;
|
|
||||||
storage.insert("edge", vec![i(3), i(3)])?;
|
|
||||||
storage.insert("edge", vec![i(4), i(5)])?;
|
|
||||||
|
|
||||||
let edge_table: Table = scan_as_table(&storage, "edge")?;
|
|
||||||
let self_loops = scan_atom(
|
|
||||||
&edge_table,
|
|
||||||
&AtomPattern {
|
|
||||||
columns: vec![var("X"), var("X")],
|
|
||||||
},
|
|
||||||
);
|
|
||||||
|
|
||||||
assert_eq!(self_loops.columns, vec!["X".to_string()]);
|
|
||||||
assert_eq!(self_loops.rows, vec![vec![i(2)], vec![i(3)]]);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
@ -1,30 +0,0 @@
|
|||||||
[package]
|
|
||||||
name = "query-storage"
|
|
||||||
version = "0.1.0"
|
|
||||||
edition.workspace = true
|
|
||||||
license.workspace = true
|
|
||||||
rust-version.workspace = true
|
|
||||||
|
|
||||||
[lints.rust]
|
|
||||||
unsafe_code = "deny"
|
|
||||||
|
|
||||||
[lints.clippy]
|
|
||||||
pedantic = "warn"
|
|
||||||
|
|
||||||
[features]
|
|
||||||
default = []
|
|
||||||
lmdb = ["dep:heed"]
|
|
||||||
redb = ["dep:redb"]
|
|
||||||
fjall = ["dep:fjall"]
|
|
||||||
sled = ["dep:sled"]
|
|
||||||
geomerge = ["dep:geomerge"]
|
|
||||||
|
|
||||||
[dependencies]
|
|
||||||
heed = { version = "0.20", optional = true }
|
|
||||||
redb = { version = "2", optional = true }
|
|
||||||
fjall = { version = "2", optional = true }
|
|
||||||
sled = { version = "0.34", optional = true }
|
|
||||||
geomerge = { path = "../../external/geomerge/crates/geomerge", optional = true }
|
|
||||||
|
|
||||||
[dev-dependencies]
|
|
||||||
tempfile = "3"
|
|
||||||
@ -1,262 +0,0 @@
|
|||||||
//! Wire format shared by every byte-oriented backend in this crate.
|
|
||||||
//!
|
|
||||||
//! The encoding is hand-rolled (no `serde`, no `bincode`) so that the
|
|
||||||
//! generated bytes are stable and inspectable. It is **not** versioned: adding
|
|
||||||
//! a new [`Value`] variant invalidates previously-stored data. That is fine
|
|
||||||
//! for a playground; production code would prepend a format byte.
|
|
||||||
//!
|
|
||||||
//! ## Row Format
|
|
||||||
//!
|
|
||||||
//! `[count: u32 LE] [val × count]`
|
|
||||||
//!
|
|
||||||
//! ## Value Format
|
|
||||||
//!
|
|
||||||
//! `[tag: u8] [payload]`
|
|
||||||
//!
|
|
||||||
//! | Tag | Variant | Payload |
|
|
||||||
//! |--------|---------------|--------------------------------------|
|
|
||||||
//! | `0x00` | `Value::Int` | `i64 LE` (8 bytes) |
|
|
||||||
//! | `0x01` | `Value::Str` | `[len: u32 LE] [bytes]` |
|
|
||||||
//!
|
|
||||||
//! ## Row Key Format
|
|
||||||
//!
|
|
||||||
//! Synthetic row IDs are `u64` encoded big-endian so lexicographic key order
|
|
||||||
//! matches insertion order. Backends with named sub-stores per relation can
|
|
||||||
//! use this directly as the key.
|
|
||||||
//!
|
|
||||||
//! ## Metadata Format
|
|
||||||
//!
|
|
||||||
//! Per-relation metadata is `[arity: u32 LE] [next_id: u64 LE]` = 12 bytes.
|
|
||||||
|
|
||||||
use crate::value::Value;
|
|
||||||
|
|
||||||
/// Errors raised by [`decode_row`] and [`decode_meta`].
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub enum CodecError {
|
|
||||||
/// The byte slice ended before the expected number of fields was read.
|
|
||||||
UnexpectedEof,
|
|
||||||
/// A value tag byte was unrecognized.
|
|
||||||
UnknownTag(u8),
|
|
||||||
/// A length field declared more bytes than the slice contains.
|
|
||||||
LengthOverrun { declared: usize, available: usize },
|
|
||||||
/// A UTF-8 string payload could not be decoded.
|
|
||||||
InvalidUtf8,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl std::fmt::Display for CodecError {
|
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
||||||
match self {
|
|
||||||
Self::UnexpectedEof => write!(f, "unexpected end of bytes"),
|
|
||||||
Self::UnknownTag(t) => write!(f, "unknown value tag: 0x{t:02x}"),
|
|
||||||
Self::LengthOverrun {
|
|
||||||
declared,
|
|
||||||
available,
|
|
||||||
} => write!(
|
|
||||||
f,
|
|
||||||
"declared length {declared} exceeds available {available} bytes"
|
|
||||||
),
|
|
||||||
Self::InvalidUtf8 => write!(f, "invalid UTF-8 in string payload"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl std::error::Error for CodecError {}
|
|
||||||
|
|
||||||
/// Encode a row of [`Value`]s to bytes.
|
|
||||||
#[must_use]
|
|
||||||
pub fn encode_row(row: &[Value]) -> Vec<u8> {
|
|
||||||
let mut out = Vec::with_capacity(4 + row.len() * 9);
|
|
||||||
out.extend_from_slice(&u32::try_from(row.len()).unwrap_or(u32::MAX).to_le_bytes());
|
|
||||||
for value in row {
|
|
||||||
match value {
|
|
||||||
Value::Int(i) => {
|
|
||||||
out.push(0x00);
|
|
||||||
out.extend_from_slice(&i.to_le_bytes());
|
|
||||||
}
|
|
||||||
Value::Str(s) => {
|
|
||||||
out.push(0x01);
|
|
||||||
let bytes = s.as_bytes();
|
|
||||||
out.extend_from_slice(
|
|
||||||
&u32::try_from(bytes.len()).unwrap_or(u32::MAX).to_le_bytes(),
|
|
||||||
);
|
|
||||||
out.extend_from_slice(bytes);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
out
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Decode a row of [`Value`]s from bytes.
|
|
||||||
///
|
|
||||||
/// # Errors
|
|
||||||
/// Returns [`CodecError`] if the byte slice is malformed.
|
|
||||||
pub fn decode_row(mut bytes: &[u8]) -> Result<Vec<Value>, CodecError> {
|
|
||||||
let count = read_u32(&mut bytes)? as usize;
|
|
||||||
let mut row = Vec::with_capacity(count);
|
|
||||||
for _ in 0..count {
|
|
||||||
row.push(read_value(&mut bytes)?);
|
|
||||||
}
|
|
||||||
Ok(row)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn read_value(bytes: &mut &[u8]) -> Result<Value, CodecError> {
|
|
||||||
let tag = read_u8(bytes)?;
|
|
||||||
match tag {
|
|
||||||
0x00 => {
|
|
||||||
let i = read_i64(bytes)?;
|
|
||||||
Ok(Value::Int(i))
|
|
||||||
}
|
|
||||||
0x01 => {
|
|
||||||
let len = read_u32(bytes)? as usize;
|
|
||||||
if bytes.len() < len {
|
|
||||||
return Err(CodecError::LengthOverrun {
|
|
||||||
declared: len,
|
|
||||||
available: bytes.len(),
|
|
||||||
});
|
|
||||||
}
|
|
||||||
let (head, tail) = bytes.split_at(len);
|
|
||||||
*bytes = tail;
|
|
||||||
let s = std::str::from_utf8(head)
|
|
||||||
.map_err(|_| CodecError::InvalidUtf8)?
|
|
||||||
.to_string();
|
|
||||||
Ok(Value::Str(s))
|
|
||||||
}
|
|
||||||
other => Err(CodecError::UnknownTag(other)),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn read_u8(bytes: &mut &[u8]) -> Result<u8, CodecError> {
|
|
||||||
let (head, tail) = bytes.split_first().ok_or(CodecError::UnexpectedEof)?;
|
|
||||||
*bytes = tail;
|
|
||||||
Ok(*head)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn read_u32(bytes: &mut &[u8]) -> Result<u32, CodecError> {
|
|
||||||
if bytes.len() < 4 {
|
|
||||||
return Err(CodecError::UnexpectedEof);
|
|
||||||
}
|
|
||||||
let (head, tail) = bytes.split_at(4);
|
|
||||||
*bytes = tail;
|
|
||||||
let mut buf = [0u8; 4];
|
|
||||||
buf.copy_from_slice(head);
|
|
||||||
Ok(u32::from_le_bytes(buf))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn read_u64(bytes: &mut &[u8]) -> Result<u64, CodecError> {
|
|
||||||
if bytes.len() < 8 {
|
|
||||||
return Err(CodecError::UnexpectedEof);
|
|
||||||
}
|
|
||||||
let (head, tail) = bytes.split_at(8);
|
|
||||||
*bytes = tail;
|
|
||||||
let mut buf = [0u8; 8];
|
|
||||||
buf.copy_from_slice(head);
|
|
||||||
Ok(u64::from_le_bytes(buf))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn read_i64(bytes: &mut &[u8]) -> Result<i64, CodecError> {
|
|
||||||
if bytes.len() < 8 {
|
|
||||||
return Err(CodecError::UnexpectedEof);
|
|
||||||
}
|
|
||||||
let (head, tail) = bytes.split_at(8);
|
|
||||||
*bytes = tail;
|
|
||||||
let mut buf = [0u8; 8];
|
|
||||||
buf.copy_from_slice(head);
|
|
||||||
Ok(i64::from_le_bytes(buf))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Encode a row key from a synthetic u64 ID.
|
|
||||||
///
|
|
||||||
/// Big-endian so lexicographic key order matches insertion order.
|
|
||||||
#[must_use]
|
|
||||||
pub fn row_key(id: u64) -> [u8; 8] {
|
|
||||||
id.to_be_bytes()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Encode per-relation metadata: arity and next row ID.
|
|
||||||
#[must_use]
|
|
||||||
pub fn encode_meta(arity: u32, next_id: u64) -> [u8; 12] {
|
|
||||||
let mut out = [0u8; 12];
|
|
||||||
out[0..4].copy_from_slice(&arity.to_le_bytes());
|
|
||||||
out[4..12].copy_from_slice(&next_id.to_le_bytes());
|
|
||||||
out
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Decode per-relation metadata.
|
|
||||||
///
|
|
||||||
/// # Errors
|
|
||||||
/// Returns [`CodecError::UnexpectedEof`] if the slice is shorter than 12 bytes.
|
|
||||||
pub fn decode_meta(mut bytes: &[u8]) -> Result<(u32, u64), CodecError> {
|
|
||||||
let arity = read_u32(&mut bytes)?;
|
|
||||||
let next_id = read_u64(&mut bytes)?;
|
|
||||||
Ok((arity, next_id))
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use super::*;
|
|
||||||
|
|
||||||
fn i(x: i64) -> Value {
|
|
||||||
Value::Int(x)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn s(x: &str) -> Value {
|
|
||||||
Value::Str(x.to_string())
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn encode_decode_int_only_row() -> Result<(), CodecError> {
|
|
||||||
let row = vec![i(1), i(-2), i(i64::MAX)];
|
|
||||||
let bytes = encode_row(&row);
|
|
||||||
let decoded = decode_row(&bytes)?;
|
|
||||||
assert_eq!(decoded, row);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn encode_decode_mixed_row() -> Result<(), CodecError> {
|
|
||||||
let row = vec![s("Alice"), i(42), s("a longer string with spaces")];
|
|
||||||
let bytes = encode_row(&row);
|
|
||||||
let decoded = decode_row(&bytes)?;
|
|
||||||
assert_eq!(decoded, row);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn encode_decode_empty_row() -> Result<(), CodecError> {
|
|
||||||
let bytes = encode_row(&[]);
|
|
||||||
let decoded = decode_row(&bytes)?;
|
|
||||||
assert!(decoded.is_empty());
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn decode_unknown_tag_fails() {
|
|
||||||
let bytes = vec![1, 0, 0, 0, 0xFF];
|
|
||||||
assert!(matches!(
|
|
||||||
decode_row(&bytes),
|
|
||||||
Err(CodecError::UnknownTag(0xFF))
|
|
||||||
));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn decode_truncated_fails() {
|
|
||||||
let bytes = vec![1, 0, 0, 0, 0x00, 0x01];
|
|
||||||
assert!(matches!(decode_row(&bytes), Err(CodecError::UnexpectedEof)));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn row_key_preserves_order() {
|
|
||||||
assert!(row_key(1) < row_key(2));
|
|
||||||
assert!(row_key(255) < row_key(256));
|
|
||||||
assert!(row_key(u64::MAX - 1) < row_key(u64::MAX));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn meta_roundtrip() -> Result<(), CodecError> {
|
|
||||||
let encoded = encode_meta(3, 12345);
|
|
||||||
let (arity, next_id) = decode_meta(&encoded)?;
|
|
||||||
assert_eq!(arity, 3);
|
|
||||||
assert_eq!(next_id, 12345);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,149 +0,0 @@
|
|||||||
//! fjall adapter.
|
|
||||||
//!
|
|
||||||
//! Each relation gets a fjall [`PartitionHandle`](fjall::PartitionHandle) of
|
|
||||||
//! the same name. A reserved partition named `__meta` carries per-relation
|
|
||||||
//! metadata (arity and next synthetic row ID).
|
|
||||||
|
|
||||||
use fjall::{Keyspace, PartitionCreateOptions, PartitionHandle};
|
|
||||||
|
|
||||||
use crate::value::Value;
|
|
||||||
|
|
||||||
use crate::codec::{decode_meta, decode_row, encode_meta, encode_row, row_key};
|
|
||||||
use crate::{Storage, StorageError};
|
|
||||||
|
|
||||||
const META_PARTITION: &str = "__meta";
|
|
||||||
|
|
||||||
fn backend<E: std::error::Error + Send + Sync + 'static>(err: E) -> StorageError {
|
|
||||||
StorageError::Backend(Box::new(err))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// fjall-backed [`Storage`] implementation.
|
|
||||||
pub struct FjallStorage {
|
|
||||||
keyspace: Keyspace,
|
|
||||||
meta: PartitionHandle,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl FjallStorage {
|
|
||||||
/// Open or create a fjall keyspace at `path`.
|
|
||||||
///
|
|
||||||
/// # Errors
|
|
||||||
/// Returns [`StorageError::Backend`] if fjall fails to open the path.
|
|
||||||
pub fn open(path: impl AsRef<std::path::Path>) -> Result<Self, StorageError> {
|
|
||||||
let keyspace = fjall::Config::new(path).open().map_err(backend)?;
|
|
||||||
let meta = keyspace
|
|
||||||
.open_partition(META_PARTITION, PartitionCreateOptions::default())
|
|
||||||
.map_err(backend)?;
|
|
||||||
Ok(Self { keyspace, meta })
|
|
||||||
}
|
|
||||||
|
|
||||||
fn relation_partition(&self, name: &str) -> Result<PartitionHandle, StorageError> {
|
|
||||||
self.keyspace
|
|
||||||
.open_partition(name, PartitionCreateOptions::default())
|
|
||||||
.map_err(backend)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn load_meta(&self, name: &str) -> Result<(u32, u64), StorageError> {
|
|
||||||
let raw = self
|
|
||||||
.meta
|
|
||||||
.get(name.as_bytes())
|
|
||||||
.map_err(backend)?
|
|
||||||
.ok_or_else(|| StorageError::RelationNotFound(name.to_string()))?;
|
|
||||||
Ok(decode_meta(raw.as_ref())?)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn store_meta(&self, name: &str, arity: u32, next_id: u64) -> Result<(), StorageError> {
|
|
||||||
self.meta
|
|
||||||
.insert(name.as_bytes(), encode_meta(arity, next_id))
|
|
||||||
.map_err(backend)?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Storage for FjallStorage {
|
|
||||||
fn create_relation(&mut self, name: &str, arity: usize) -> Result<(), StorageError> {
|
|
||||||
if name == META_PARTITION {
|
|
||||||
return Err(StorageError::Validation(format!(
|
|
||||||
"relation name '{name}' is reserved"
|
|
||||||
)));
|
|
||||||
}
|
|
||||||
if self.meta.contains_key(name.as_bytes()).map_err(backend)? {
|
|
||||||
return Err(StorageError::RelationExists(name.to_string()));
|
|
||||||
}
|
|
||||||
let arity_u32 = u32::try_from(arity)
|
|
||||||
.map_err(|_| StorageError::Validation(format!("arity {arity} exceeds u32 range")))?;
|
|
||||||
self.store_meta(name, arity_u32, 0)?;
|
|
||||||
let _ = self.relation_partition(name)?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn arity(&self, name: &str) -> Result<usize, StorageError> {
|
|
||||||
let (arity, _) = self.load_meta(name)?;
|
|
||||||
Ok(arity as usize)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn scan(&self, name: &str) -> Result<Vec<Vec<Value>>, StorageError> {
|
|
||||||
let _ = self.load_meta(name)?;
|
|
||||||
let partition = self.relation_partition(name)?;
|
|
||||||
let mut rows = Vec::new();
|
|
||||||
for entry in partition.iter() {
|
|
||||||
let (_, value) = entry.map_err(backend)?;
|
|
||||||
rows.push(decode_row(value.as_ref())?);
|
|
||||||
}
|
|
||||||
Ok(rows)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn insert(&mut self, name: &str, row: Vec<Value>) -> Result<(), StorageError> {
|
|
||||||
let (arity, next_id) = self.load_meta(name)?;
|
|
||||||
if row.len() != arity as usize {
|
|
||||||
return Err(StorageError::ArityMismatch {
|
|
||||||
expected: arity as usize,
|
|
||||||
got: row.len(),
|
|
||||||
});
|
|
||||||
}
|
|
||||||
let partition = self.relation_partition(name)?;
|
|
||||||
partition
|
|
||||||
.insert(row_key(next_id), encode_row(&row))
|
|
||||||
.map_err(backend)?;
|
|
||||||
self.store_meta(name, arity, next_id + 1)?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use super::*;
|
|
||||||
|
|
||||||
fn i(x: i64) -> Value {
|
|
||||||
Value::Int(x)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn open_temp() -> Result<FjallStorage, StorageError> {
|
|
||||||
let dir = tempfile::tempdir().map_err(backend)?;
|
|
||||||
let storage = FjallStorage::open(dir.path())?;
|
|
||||||
std::mem::forget(dir);
|
|
||||||
Ok(storage)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn create_insert_scan_roundtrip() -> Result<(), StorageError> {
|
|
||||||
let mut storage = open_temp()?;
|
|
||||||
storage.create_relation("edge", 2)?;
|
|
||||||
storage.insert("edge", vec![i(1), i(2)])?;
|
|
||||||
storage.insert("edge", vec![i(2), i(3)])?;
|
|
||||||
let rows = storage.scan("edge")?;
|
|
||||||
assert_eq!(rows, vec![vec![i(1), i(2)], vec![i(2), i(3)]]);
|
|
||||||
assert_eq!(storage.arity("edge")?, 2);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn duplicate_create_returns_err() -> Result<(), StorageError> {
|
|
||||||
let mut storage = open_temp()?;
|
|
||||||
storage.create_relation("edge", 2)?;
|
|
||||||
assert!(matches!(
|
|
||||||
storage.create_relation("edge", 2),
|
|
||||||
Err(StorageError::RelationExists(_))
|
|
||||||
));
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,252 +0,0 @@
|
|||||||
//! Geomerge adapter.
|
|
||||||
//!
|
|
||||||
//! Unlike the other backends, geomerge schemas are **immutable after store
|
|
||||||
//! construction**: there is no public API to register a new table on a live
|
|
||||||
//! `Store`. The adapter therefore expects all relations to be declared up
|
|
||||||
//! front via a `FlatTheory` passed to [`GeomergeStorage::from_theory`], and
|
|
||||||
//! [`Storage::create_relation`] becomes a verifier that the relation exists
|
|
||||||
//! in the loaded theory and that its arity matches.
|
|
||||||
//!
|
|
||||||
//! Additional v1 mismatches with the trait:
|
|
||||||
//!
|
|
||||||
//! - Column types are typed (`PrimInt` / `PrimString`) in geomerge but the
|
|
||||||
//! trait's `create_relation` only carries `arity`. The adapter cannot
|
|
||||||
//! declare a relation at runtime, so this issue surfaces only at insert
|
|
||||||
//! time when geomerge rejects a row with `StorageError::Validation`.
|
|
||||||
//! - Cells of type `CellValue::Id` cannot be represented in our `Value` enum.
|
|
||||||
//! Scanning a table that contains such cells returns `StorageError::Validation`.
|
|
||||||
//! - Every `insert` opens a fresh `Transaction` and commits. Law violations
|
|
||||||
//! surface at commit time, not at the `add` call.
|
|
||||||
|
|
||||||
use std::collections::HashSet;
|
|
||||||
|
|
||||||
use geomerge::ir::{self, Path};
|
|
||||||
use geomerge::store::Store;
|
|
||||||
use geomerge::table::CellValue;
|
|
||||||
use geomerge::txn::ops::TxnCellValue;
|
|
||||||
|
|
||||||
use crate::value::Value;
|
|
||||||
|
|
||||||
use crate::{Storage, StorageError};
|
|
||||||
|
|
||||||
fn backend<E: std::error::Error + Send + Sync + 'static>(err: E) -> StorageError {
|
|
||||||
StorageError::Backend(Box::new(err))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn validation(msg: impl Into<String>) -> StorageError {
|
|
||||||
StorageError::Validation(msg.into())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Geomerge-backed [`Storage`] implementation.
|
|
||||||
///
|
|
||||||
/// Construct via [`GeomergeStorage::new`] (empty store, no relations) or
|
|
||||||
/// [`GeomergeStorage::from_theory`] (preloaded with a `FlatTheory`).
|
|
||||||
pub struct GeomergeStorage {
|
|
||||||
store: Store,
|
|
||||||
declared: HashSet<String>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Default for GeomergeStorage {
|
|
||||||
fn default() -> Self {
|
|
||||||
Self::new()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl GeomergeStorage {
|
|
||||||
/// Build an empty store. No relations are available until the store is
|
|
||||||
/// rebuilt via a theory.
|
|
||||||
#[must_use]
|
|
||||||
pub fn new() -> Self {
|
|
||||||
Self {
|
|
||||||
store: Store::new(),
|
|
||||||
declared: HashSet::new(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Build a store from a pre-defined `FlatTheory`. All `create_relation`
|
|
||||||
/// calls must reference relations declared in the theory.
|
|
||||||
///
|
|
||||||
/// # Errors
|
|
||||||
/// Returns [`StorageError::Backend`] if geomerge rejects the theory.
|
|
||||||
pub fn from_theory(theory: ir::FlatTheory) -> Result<Self, StorageError> {
|
|
||||||
let store = Store::try_from_theory(theory).map_err(|e| backend(*e))?;
|
|
||||||
Ok(Self {
|
|
||||||
store,
|
|
||||||
declared: HashSet::new(),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Storage for GeomergeStorage {
|
|
||||||
fn create_relation(&mut self, name: &str, arity: usize) -> Result<(), StorageError> {
|
|
||||||
if self.declared.contains(name) {
|
|
||||||
return Err(StorageError::RelationExists(name.to_string()));
|
|
||||||
}
|
|
||||||
let path: Path = name.into();
|
|
||||||
let table = self.store.table_at(&path).ok_or_else(|| {
|
|
||||||
validation(format!(
|
|
||||||
"relation '{name}' is not declared in the loaded geomerge theory; \
|
|
||||||
geomerge does not support runtime relation creation"
|
|
||||||
))
|
|
||||||
})?;
|
|
||||||
let declared_arity = table.schema().columns.len();
|
|
||||||
if declared_arity != arity {
|
|
||||||
return Err(StorageError::ArityMismatch {
|
|
||||||
expected: declared_arity,
|
|
||||||
got: arity,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
self.declared.insert(name.to_string());
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn arity(&self, name: &str) -> Result<usize, StorageError> {
|
|
||||||
let path: Path = name.into();
|
|
||||||
let table = self
|
|
||||||
.store
|
|
||||||
.table_at(&path)
|
|
||||||
.ok_or_else(|| StorageError::RelationNotFound(name.to_string()))?;
|
|
||||||
Ok(table.schema().columns.len())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn scan(&self, name: &str) -> Result<Vec<Vec<Value>>, StorageError> {
|
|
||||||
let path: Path = name.into();
|
|
||||||
let table = self
|
|
||||||
.store
|
|
||||||
.table_at(&path)
|
|
||||||
.ok_or_else(|| StorageError::RelationNotFound(name.to_string()))?;
|
|
||||||
let arity = table.schema().columns.len();
|
|
||||||
let mut rows = Vec::with_capacity(table.row_count());
|
|
||||||
for r in 0..table.row_count() {
|
|
||||||
let mut row = Vec::with_capacity(arity);
|
|
||||||
for c in 0..arity {
|
|
||||||
let cell = table
|
|
||||||
.cell_at(r, c)
|
|
||||||
.ok_or_else(|| validation(format!("missing cell at ({r}, {c}) in '{name}'")))?;
|
|
||||||
row.push(cell_to_value(cell)?);
|
|
||||||
}
|
|
||||||
rows.push(row);
|
|
||||||
}
|
|
||||||
Ok(rows)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn insert(&mut self, name: &str, row: Vec<Value>) -> Result<(), StorageError> {
|
|
||||||
let path: Path = name.into();
|
|
||||||
let arity = self.arity(name)?;
|
|
||||||
if row.len() != arity {
|
|
||||||
return Err(StorageError::ArityMismatch {
|
|
||||||
expected: arity,
|
|
||||||
got: row.len(),
|
|
||||||
});
|
|
||||||
}
|
|
||||||
let values: Vec<TxnCellValue> = row.into_iter().map(value_to_txn_cell).collect();
|
|
||||||
let mut txn = self.store.transaction();
|
|
||||||
txn.add(&path, values)
|
|
||||||
.map_err(|e| validation(e.to_string()))?;
|
|
||||||
// Law violations surface here at commit time, not at add time.
|
|
||||||
txn.commit().map_err(|e| validation(e.to_string()))?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn cell_to_value(cell: &CellValue) -> Result<Value, StorageError> {
|
|
||||||
match cell {
|
|
||||||
CellValue::Int(i) => Ok(Value::Int(*i)),
|
|
||||||
CellValue::Str(s) => Ok(Value::Str(s.clone())),
|
|
||||||
CellValue::Id(_) => Err(validation(
|
|
||||||
"geomerge CellValue::Id cannot be represented in the playground's Value enum",
|
|
||||||
)),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn value_to_txn_cell(value: Value) -> TxnCellValue {
|
|
||||||
match value {
|
|
||||||
Value::Int(i) => TxnCellValue::Int(i),
|
|
||||||
Value::Str(s) => TxnCellValue::Str(s),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use super::*;
|
|
||||||
use geomerge::ir::{ColType, FlatTheory, PrimType, Schema, TableEntry};
|
|
||||||
|
|
||||||
fn i(x: i64) -> Value {
|
|
||||||
Value::Int(x)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn int_schema(arity: usize) -> Schema {
|
|
||||||
Schema {
|
|
||||||
columns: (0..arity)
|
|
||||||
.map(|_| ColType::PrimType {
|
|
||||||
prim: PrimType::PrimInt,
|
|
||||||
})
|
|
||||||
.collect(),
|
|
||||||
primary_key: None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn theory_with_one_int_table(name: &str, arity: usize) -> FlatTheory {
|
|
||||||
FlatTheory {
|
|
||||||
tables: vec![TableEntry {
|
|
||||||
path: name.into(),
|
|
||||||
table: int_schema(arity),
|
|
||||||
}],
|
|
||||||
laws: Vec::new(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn empty_store_has_no_relations() {
|
|
||||||
let storage = GeomergeStorage::new();
|
|
||||||
assert!(matches!(
|
|
||||||
storage.arity("edge"),
|
|
||||||
Err(StorageError::RelationNotFound(_))
|
|
||||||
));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn create_relation_on_undeclared_returns_validation_error() {
|
|
||||||
let mut storage = GeomergeStorage::new();
|
|
||||||
assert!(matches!(
|
|
||||||
storage.create_relation("edge", 2),
|
|
||||||
Err(StorageError::Validation(_))
|
|
||||||
));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn theory_loaded_insert_scan_roundtrip() -> Result<(), StorageError> {
|
|
||||||
let theory = theory_with_one_int_table("edge", 2);
|
|
||||||
let mut storage = GeomergeStorage::from_theory(theory)?;
|
|
||||||
storage.create_relation("edge", 2)?;
|
|
||||||
storage.insert("edge", vec![i(1), i(2)])?;
|
|
||||||
storage.insert("edge", vec![i(3), i(4)])?;
|
|
||||||
let rows = storage.scan("edge")?;
|
|
||||||
assert_eq!(rows, vec![vec![i(1), i(2)], vec![i(3), i(4)]]);
|
|
||||||
assert_eq!(storage.arity("edge")?, 2);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn duplicate_create_returns_err() -> Result<(), StorageError> {
|
|
||||||
let theory = theory_with_one_int_table("edge", 2);
|
|
||||||
let mut storage = GeomergeStorage::from_theory(theory)?;
|
|
||||||
storage.create_relation("edge", 2)?;
|
|
||||||
assert!(matches!(
|
|
||||||
storage.create_relation("edge", 2),
|
|
||||||
Err(StorageError::RelationExists(_))
|
|
||||||
));
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn insert_wrong_type_returns_validation_error() -> Result<(), StorageError> {
|
|
||||||
let theory = theory_with_one_int_table("edge", 2);
|
|
||||||
let mut storage = GeomergeStorage::from_theory(theory)?;
|
|
||||||
storage.create_relation("edge", 2)?;
|
|
||||||
// Insert a Str into an Int column: geomerge rejects it.
|
|
||||||
let result = storage.insert("edge", vec![Value::Str("not an int".to_string()), i(2)]);
|
|
||||||
assert!(matches!(result, Err(StorageError::Validation(_))));
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,145 +0,0 @@
|
|||||||
//! Storage layer for the query-plan playground.
|
|
||||||
//!
|
|
||||||
//! This is the foundational crate of the workspace. It owns the [`Value`] cell
|
|
||||||
//! type and the [`Table`] container, defines the [`Storage`] trait, and ships
|
|
||||||
//! adapters for several backends behind Cargo features. Higher-level crates
|
|
||||||
//! such as `query-ops` depend on this crate for both the types and the trait.
|
|
||||||
//!
|
|
||||||
//! The v1 trait surface is deliberately narrow: create a relation, scan all
|
|
||||||
//! rows, insert a row, ask for arity. Transactions, range scans, deletes, and
|
|
||||||
//! delta streams are not modeled yet, and will be added when a specific
|
|
||||||
//! experiment demands them.
|
|
||||||
//!
|
|
||||||
//! ## Backends
|
|
||||||
//!
|
|
||||||
//! [`MemoryStorage`] is always available. Other backends are gated behind
|
|
||||||
//! Cargo features so users only pay for what they need:
|
|
||||||
//!
|
|
||||||
//! - `lmdb` — LMDB via the `heed` crate
|
|
||||||
//! - `redb` — pure-Rust embedded KV
|
|
||||||
//! - `fjall` — pure-Rust LSM-tree
|
|
||||||
//! - `sled` — pure-Rust LSM-tree
|
|
||||||
//! - `geomerge` — the workspace's `geomerge` crate
|
|
||||||
|
|
||||||
use crate::table::Table;
|
|
||||||
use crate::value::Value;
|
|
||||||
|
|
||||||
pub mod codec;
|
|
||||||
pub mod memory;
|
|
||||||
pub mod table;
|
|
||||||
pub mod value;
|
|
||||||
|
|
||||||
#[cfg(feature = "sled")]
|
|
||||||
pub mod sled;
|
|
||||||
|
|
||||||
#[cfg(feature = "redb")]
|
|
||||||
pub mod redb;
|
|
||||||
|
|
||||||
#[cfg(feature = "fjall")]
|
|
||||||
pub mod fjall;
|
|
||||||
|
|
||||||
#[cfg(feature = "lmdb")]
|
|
||||||
pub mod lmdb;
|
|
||||||
|
|
||||||
#[cfg(feature = "geomerge")]
|
|
||||||
pub mod geomerge;
|
|
||||||
|
|
||||||
pub use memory::MemoryStorage;
|
|
||||||
|
|
||||||
/// Errors returned by a [`Storage`] backend.
|
|
||||||
///
|
|
||||||
/// Backend-specific failures (LMDB transaction aborts, sled I/O errors, etc.)
|
|
||||||
/// are wrapped in [`StorageError::Backend`].
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub enum StorageError {
|
|
||||||
/// No relation with the given name exists in this backend.
|
|
||||||
RelationNotFound(String),
|
|
||||||
/// A relation with the given name already exists.
|
|
||||||
RelationExists(String),
|
|
||||||
/// A row was offered with the wrong number of columns.
|
|
||||||
ArityMismatch { expected: usize, got: usize },
|
|
||||||
/// A backend-defined validation rule rejected the operation, for example
|
|
||||||
/// a `geomerge` law violation.
|
|
||||||
Validation(String),
|
|
||||||
/// A row decoded from storage was malformed.
|
|
||||||
Decode(codec::CodecError),
|
|
||||||
/// A backend-specific error wrapped for transport across the trait.
|
|
||||||
Backend(Box<dyn std::error::Error + Send + Sync>),
|
|
||||||
}
|
|
||||||
|
|
||||||
impl std::fmt::Display for StorageError {
|
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
||||||
match self {
|
|
||||||
Self::RelationNotFound(name) => write!(f, "relation not found: {name}"),
|
|
||||||
Self::RelationExists(name) => write!(f, "relation already exists: {name}"),
|
|
||||||
Self::ArityMismatch { expected, got } => {
|
|
||||||
write!(f, "arity mismatch: expected {expected}, got {got}")
|
|
||||||
}
|
|
||||||
Self::Validation(msg) => write!(f, "validation failed: {msg}"),
|
|
||||||
Self::Decode(err) => write!(f, "decode error: {err}"),
|
|
||||||
Self::Backend(err) => write!(f, "backend error: {err}"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl std::error::Error for StorageError {
|
|
||||||
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
|
|
||||||
match self {
|
|
||||||
Self::Backend(err) => Some(err.as_ref()),
|
|
||||||
Self::Decode(err) => Some(err),
|
|
||||||
_ => None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<codec::CodecError> for StorageError {
|
|
||||||
fn from(err: codec::CodecError) -> Self {
|
|
||||||
Self::Decode(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Backend-agnostic interface for storing and retrieving rows.
|
|
||||||
///
|
|
||||||
/// Each relation has a fixed name, a fixed arity (row width), and an ordered
|
|
||||||
/// collection of rows whose cells are [`Value`]s. Concrete implementations
|
|
||||||
/// include [`MemoryStorage`] in this crate plus the feature-gated backends.
|
|
||||||
pub trait Storage {
|
|
||||||
/// Create a new relation with the given name and arity.
|
|
||||||
///
|
|
||||||
/// # Errors
|
|
||||||
/// Returns [`StorageError::RelationExists`] if a relation with the given
|
|
||||||
/// name already exists.
|
|
||||||
fn create_relation(&mut self, name: &str, arity: usize) -> Result<(), StorageError>;
|
|
||||||
|
|
||||||
/// Return the arity of the given relation.
|
|
||||||
///
|
|
||||||
/// # Errors
|
|
||||||
/// Returns [`StorageError::RelationNotFound`] if no such relation exists.
|
|
||||||
fn arity(&self, name: &str) -> Result<usize, StorageError>;
|
|
||||||
|
|
||||||
/// Scan all rows of the given relation in storage order.
|
|
||||||
///
|
|
||||||
/// # Errors
|
|
||||||
/// Returns [`StorageError::RelationNotFound`] if no such relation exists.
|
|
||||||
fn scan(&self, name: &str) -> Result<Vec<Vec<Value>>, StorageError>;
|
|
||||||
|
|
||||||
/// Append a row to the given relation.
|
|
||||||
///
|
|
||||||
/// # Errors
|
|
||||||
/// Returns [`StorageError::RelationNotFound`] if no such relation exists,
|
|
||||||
/// [`StorageError::ArityMismatch`] if the row's length differs from the
|
|
||||||
/// declared arity, or [`StorageError::Validation`] / [`StorageError::Backend`]
|
|
||||||
/// if a backend-specific rule rejects the row.
|
|
||||||
fn insert(&mut self, name: &str, row: Vec<Value>) -> Result<(), StorageError>;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Materialize a relation from a [`Storage`] backend as a [`Table`] that
|
|
||||||
/// query-language operators can consume.
|
|
||||||
///
|
|
||||||
/// # Errors
|
|
||||||
/// Returns any error produced by [`Storage::arity`] or [`Storage::scan`].
|
|
||||||
pub fn scan_as_table(storage: &dyn Storage, name: &str) -> Result<Table, StorageError> {
|
|
||||||
let arity = storage.arity(name)?;
|
|
||||||
let rows = storage.scan(name)?;
|
|
||||||
Ok(Table::from_rows(arity, rows))
|
|
||||||
}
|
|
||||||
@ -1,201 +0,0 @@
|
|||||||
//! LMDB adapter via the `heed` crate.
|
|
||||||
//!
|
|
||||||
//! Maps each relation onto a named LMDB sub-database of the same name. A
|
|
||||||
//! reserved sub-database named `__meta` carries per-relation metadata (arity
|
|
||||||
//! and next synthetic row ID).
|
|
||||||
//!
|
|
||||||
//! Note: every [`Storage::insert`] opens its own write transaction. LMDB
|
|
||||||
//! serializes writers across the env, so per-row inserts will be slow on real
|
|
||||||
//! workloads. The v1 trait does not yet expose batch inserts.
|
|
||||||
|
|
||||||
use heed::types::Bytes;
|
|
||||||
use heed::{Database, Env, EnvOpenOptions};
|
|
||||||
|
|
||||||
use crate::value::Value;
|
|
||||||
|
|
||||||
use crate::codec::{decode_meta, decode_row, encode_meta, encode_row, row_key};
|
|
||||||
use crate::{Storage, StorageError};
|
|
||||||
|
|
||||||
const META_DB: &str = "__meta";
|
|
||||||
const DEFAULT_MAX_DBS: u32 = 128;
|
|
||||||
const DEFAULT_MAP_SIZE: usize = 100 * 1024 * 1024;
|
|
||||||
|
|
||||||
fn backend<E: std::error::Error + Send + Sync + 'static>(err: E) -> StorageError {
|
|
||||||
StorageError::Backend(Box::new(err))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// LMDB-backed [`Storage`] implementation.
|
|
||||||
pub struct LmdbStorage {
|
|
||||||
env: Env,
|
|
||||||
meta: Database<Bytes, Bytes>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl LmdbStorage {
|
|
||||||
/// Open or create an LMDB environment at `path`.
|
|
||||||
///
|
|
||||||
/// The path must already exist as a directory; LMDB will create its data
|
|
||||||
/// files inside it.
|
|
||||||
///
|
|
||||||
/// # Errors
|
|
||||||
/// Returns [`StorageError::Backend`] if LMDB fails to open.
|
|
||||||
///
|
|
||||||
/// # Safety
|
|
||||||
/// Internally uses `EnvOpenOptions::open`, which heed marks `unsafe`
|
|
||||||
/// because the memory-mapped file's contents can be modified by other
|
|
||||||
/// processes. The adapter assumes single-process exclusive access.
|
|
||||||
#[allow(unsafe_code)]
|
|
||||||
pub fn open(path: impl AsRef<std::path::Path>) -> Result<Self, StorageError> {
|
|
||||||
// SAFETY: heed marks `open` unsafe because the mmap'd file's contents
|
|
||||||
// can be modified by other processes, violating Rust's aliasing rules.
|
|
||||||
// This adapter assumes single-process exclusive access to the path,
|
|
||||||
// which holds for tests and typical playground use.
|
|
||||||
let env = unsafe {
|
|
||||||
EnvOpenOptions::new()
|
|
||||||
.max_dbs(DEFAULT_MAX_DBS)
|
|
||||||
.map_size(DEFAULT_MAP_SIZE)
|
|
||||||
.open(path)
|
|
||||||
.map_err(backend)?
|
|
||||||
};
|
|
||||||
let mut wtxn = env.write_txn().map_err(backend)?;
|
|
||||||
let meta: Database<Bytes, Bytes> = env
|
|
||||||
.create_database(&mut wtxn, Some(META_DB))
|
|
||||||
.map_err(backend)?;
|
|
||||||
wtxn.commit().map_err(backend)?;
|
|
||||||
Ok(Self { env, meta })
|
|
||||||
}
|
|
||||||
|
|
||||||
fn open_relation_db(
|
|
||||||
&self,
|
|
||||||
wtxn: &mut heed::RwTxn,
|
|
||||||
name: &str,
|
|
||||||
) -> Result<Database<Bytes, Bytes>, StorageError> {
|
|
||||||
self.env.create_database(wtxn, Some(name)).map_err(backend)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Storage for LmdbStorage {
|
|
||||||
fn create_relation(&mut self, name: &str, arity: usize) -> Result<(), StorageError> {
|
|
||||||
if name == META_DB {
|
|
||||||
return Err(StorageError::Validation(format!(
|
|
||||||
"relation name '{name}' is reserved"
|
|
||||||
)));
|
|
||||||
}
|
|
||||||
let arity_u32 = u32::try_from(arity)
|
|
||||||
.map_err(|_| StorageError::Validation(format!("arity {arity} exceeds u32 range")))?;
|
|
||||||
let mut wtxn = self.env.write_txn().map_err(backend)?;
|
|
||||||
if self
|
|
||||||
.meta
|
|
||||||
.get(&wtxn, name.as_bytes())
|
|
||||||
.map_err(backend)?
|
|
||||||
.is_some()
|
|
||||||
{
|
|
||||||
return Err(StorageError::RelationExists(name.to_string()));
|
|
||||||
}
|
|
||||||
let encoded = encode_meta(arity_u32, 0);
|
|
||||||
self.meta
|
|
||||||
.put(&mut wtxn, name.as_bytes(), &encoded[..])
|
|
||||||
.map_err(backend)?;
|
|
||||||
let _ = self.open_relation_db(&mut wtxn, name)?;
|
|
||||||
wtxn.commit().map_err(backend)?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn arity(&self, name: &str) -> Result<usize, StorageError> {
|
|
||||||
let rtxn = self.env.read_txn().map_err(backend)?;
|
|
||||||
let raw = self
|
|
||||||
.meta
|
|
||||||
.get(&rtxn, name.as_bytes())
|
|
||||||
.map_err(backend)?
|
|
||||||
.ok_or_else(|| StorageError::RelationNotFound(name.to_string()))?;
|
|
||||||
let (arity, _) = decode_meta(raw)?;
|
|
||||||
Ok(arity as usize)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn scan(&self, name: &str) -> Result<Vec<Vec<Value>>, StorageError> {
|
|
||||||
let rtxn = self.env.read_txn().map_err(backend)?;
|
|
||||||
if self
|
|
||||||
.meta
|
|
||||||
.get(&rtxn, name.as_bytes())
|
|
||||||
.map_err(backend)?
|
|
||||||
.is_none()
|
|
||||||
{
|
|
||||||
return Err(StorageError::RelationNotFound(name.to_string()));
|
|
||||||
}
|
|
||||||
let db: Database<Bytes, Bytes> = self
|
|
||||||
.env
|
|
||||||
.open_database(&rtxn, Some(name))
|
|
||||||
.map_err(backend)?
|
|
||||||
.ok_or_else(|| StorageError::RelationNotFound(name.to_string()))?;
|
|
||||||
let mut rows = Vec::new();
|
|
||||||
for entry in db.iter(&rtxn).map_err(backend)? {
|
|
||||||
let (_, value) = entry.map_err(backend)?;
|
|
||||||
rows.push(decode_row(value)?);
|
|
||||||
}
|
|
||||||
Ok(rows)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn insert(&mut self, name: &str, row: Vec<Value>) -> Result<(), StorageError> {
|
|
||||||
let mut wtxn = self.env.write_txn().map_err(backend)?;
|
|
||||||
let meta_bytes = self
|
|
||||||
.meta
|
|
||||||
.get(&wtxn, name.as_bytes())
|
|
||||||
.map_err(backend)?
|
|
||||||
.ok_or_else(|| StorageError::RelationNotFound(name.to_string()))?;
|
|
||||||
let (arity, next_id) = decode_meta(meta_bytes)?;
|
|
||||||
if row.len() != arity as usize {
|
|
||||||
return Err(StorageError::ArityMismatch {
|
|
||||||
expected: arity as usize,
|
|
||||||
got: row.len(),
|
|
||||||
});
|
|
||||||
}
|
|
||||||
let db = self.open_relation_db(&mut wtxn, name)?;
|
|
||||||
let key = row_key(next_id);
|
|
||||||
let value = encode_row(&row);
|
|
||||||
db.put(&mut wtxn, &key[..], &value[..]).map_err(backend)?;
|
|
||||||
let new_meta = encode_meta(arity, next_id + 1);
|
|
||||||
self.meta
|
|
||||||
.put(&mut wtxn, name.as_bytes(), &new_meta[..])
|
|
||||||
.map_err(backend)?;
|
|
||||||
wtxn.commit().map_err(backend)?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use super::*;
|
|
||||||
|
|
||||||
fn i(x: i64) -> Value {
|
|
||||||
Value::Int(x)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn open_temp() -> Result<LmdbStorage, StorageError> {
|
|
||||||
let dir = tempfile::tempdir().map_err(backend)?;
|
|
||||||
let storage = LmdbStorage::open(dir.path())?;
|
|
||||||
std::mem::forget(dir);
|
|
||||||
Ok(storage)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn create_insert_scan_roundtrip() -> Result<(), StorageError> {
|
|
||||||
let mut storage = open_temp()?;
|
|
||||||
storage.create_relation("edge", 2)?;
|
|
||||||
storage.insert("edge", vec![i(1), i(2)])?;
|
|
||||||
storage.insert("edge", vec![i(2), i(3)])?;
|
|
||||||
let rows = storage.scan("edge")?;
|
|
||||||
assert_eq!(rows, vec![vec![i(1), i(2)], vec![i(2), i(3)]]);
|
|
||||||
assert_eq!(storage.arity("edge")?, 2);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn duplicate_create_returns_err() -> Result<(), StorageError> {
|
|
||||||
let mut storage = open_temp()?;
|
|
||||||
storage.create_relation("edge", 2)?;
|
|
||||||
assert!(matches!(
|
|
||||||
storage.create_relation("edge", 2),
|
|
||||||
Err(StorageError::RelationExists(_))
|
|
||||||
));
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,147 +0,0 @@
|
|||||||
//! In-memory backend, keyed by relation name. Always available.
|
|
||||||
|
|
||||||
use std::collections::HashMap;
|
|
||||||
|
|
||||||
use crate::value::Value;
|
|
||||||
|
|
||||||
use crate::{Storage, StorageError};
|
|
||||||
|
|
||||||
/// In-memory backend, useful as the default in tests and as a correctness
|
|
||||||
/// oracle for other backends.
|
|
||||||
#[derive(Debug, Default)]
|
|
||||||
pub struct MemoryStorage {
|
|
||||||
relations: HashMap<String, MemoryRelation>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
struct MemoryRelation {
|
|
||||||
arity: usize,
|
|
||||||
rows: Vec<Vec<Value>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl MemoryStorage {
|
|
||||||
#[must_use]
|
|
||||||
pub fn new() -> Self {
|
|
||||||
Self::default()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Storage for MemoryStorage {
|
|
||||||
fn create_relation(&mut self, name: &str, arity: usize) -> Result<(), StorageError> {
|
|
||||||
if self.relations.contains_key(name) {
|
|
||||||
return Err(StorageError::RelationExists(name.to_string()));
|
|
||||||
}
|
|
||||||
self.relations.insert(
|
|
||||||
name.to_string(),
|
|
||||||
MemoryRelation {
|
|
||||||
arity,
|
|
||||||
rows: Vec::new(),
|
|
||||||
},
|
|
||||||
);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn arity(&self, name: &str) -> Result<usize, StorageError> {
|
|
||||||
self.relations
|
|
||||||
.get(name)
|
|
||||||
.map(|r| r.arity)
|
|
||||||
.ok_or_else(|| StorageError::RelationNotFound(name.to_string()))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn scan(&self, name: &str) -> Result<Vec<Vec<Value>>, StorageError> {
|
|
||||||
self.relations
|
|
||||||
.get(name)
|
|
||||||
.map(|r| r.rows.clone())
|
|
||||||
.ok_or_else(|| StorageError::RelationNotFound(name.to_string()))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn insert(&mut self, name: &str, row: Vec<Value>) -> Result<(), StorageError> {
|
|
||||||
let relation = self
|
|
||||||
.relations
|
|
||||||
.get_mut(name)
|
|
||||||
.ok_or_else(|| StorageError::RelationNotFound(name.to_string()))?;
|
|
||||||
if row.len() != relation.arity {
|
|
||||||
return Err(StorageError::ArityMismatch {
|
|
||||||
expected: relation.arity,
|
|
||||||
got: row.len(),
|
|
||||||
});
|
|
||||||
}
|
|
||||||
relation.rows.push(row);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use super::*;
|
|
||||||
use crate::scan_as_table;
|
|
||||||
|
|
||||||
fn i(x: i64) -> Value {
|
|
||||||
Value::Int(x)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn create_insert_scan_roundtrip() -> Result<(), StorageError> {
|
|
||||||
let mut storage = MemoryStorage::new();
|
|
||||||
storage.create_relation("edge", 2)?;
|
|
||||||
storage.insert("edge", vec![i(1), i(2)])?;
|
|
||||||
storage.insert("edge", vec![i(2), i(3)])?;
|
|
||||||
let rows = storage.scan("edge")?;
|
|
||||||
assert_eq!(rows, vec![vec![i(1), i(2)], vec![i(2), i(3)]]);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn duplicate_create_returns_err() -> Result<(), StorageError> {
|
|
||||||
let mut storage = MemoryStorage::new();
|
|
||||||
storage.create_relation("edge", 2)?;
|
|
||||||
assert!(matches!(
|
|
||||||
storage.create_relation("edge", 2),
|
|
||||||
Err(StorageError::RelationExists(_))
|
|
||||||
));
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn scan_unknown_relation_returns_err() {
|
|
||||||
let storage = MemoryStorage::new();
|
|
||||||
assert!(matches!(
|
|
||||||
storage.scan("missing"),
|
|
||||||
Err(StorageError::RelationNotFound(_))
|
|
||||||
));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn arity_unknown_relation_returns_err() {
|
|
||||||
let storage = MemoryStorage::new();
|
|
||||||
assert!(matches!(
|
|
||||||
storage.arity("missing"),
|
|
||||||
Err(StorageError::RelationNotFound(_))
|
|
||||||
));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn insert_wrong_arity_returns_err() -> Result<(), StorageError> {
|
|
||||||
let mut storage = MemoryStorage::new();
|
|
||||||
storage.create_relation("edge", 2)?;
|
|
||||||
assert!(matches!(
|
|
||||||
storage.insert("edge", vec![i(1)]),
|
|
||||||
Err(StorageError::ArityMismatch {
|
|
||||||
expected: 2,
|
|
||||||
got: 1
|
|
||||||
})
|
|
||||||
));
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn scan_as_table_materializes_table() -> Result<(), StorageError> {
|
|
||||||
let mut storage = MemoryStorage::new();
|
|
||||||
storage.create_relation("edge", 2)?;
|
|
||||||
storage.insert("edge", vec![i(1), i(2)])?;
|
|
||||||
let table = scan_as_table(&storage, "edge")?;
|
|
||||||
assert_eq!(table.arity, 2);
|
|
||||||
assert_eq!(table.rows, vec![vec![i(1), i(2)]]);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,183 +0,0 @@
|
|||||||
//! redb adapter.
|
|
||||||
//!
|
|
||||||
//! Each relation gets a redb table named after it, keyed by `u64` row IDs.
|
|
||||||
//! A reserved table named `__meta`, keyed by relation name, carries per-relation
|
|
||||||
//! metadata (arity and next synthetic row ID).
|
|
||||||
|
|
||||||
use redb::{Database, ReadableTable, TableDefinition};
|
|
||||||
|
|
||||||
use crate::value::Value;
|
|
||||||
|
|
||||||
use crate::codec::{decode_meta, decode_row, encode_meta, encode_row};
|
|
||||||
use crate::{Storage, StorageError};
|
|
||||||
|
|
||||||
const META_TABLE: &str = "__meta";
|
|
||||||
|
|
||||||
fn backend<E: std::error::Error + Send + Sync + 'static>(err: E) -> StorageError {
|
|
||||||
StorageError::Backend(Box::new(err))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn meta_def() -> TableDefinition<'static, &'static str, &'static [u8]> {
|
|
||||||
TableDefinition::new(META_TABLE)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn rows_def(name: &str) -> TableDefinition<'_, u64, &'static [u8]> {
|
|
||||||
TableDefinition::new(name)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// redb-backed [`Storage`] implementation.
|
|
||||||
pub struct RedbStorage {
|
|
||||||
db: Database,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl RedbStorage {
|
|
||||||
/// Open or create a redb database at `path`.
|
|
||||||
///
|
|
||||||
/// # Errors
|
|
||||||
/// Returns [`StorageError::Backend`] if redb fails to open the file.
|
|
||||||
pub fn open(path: impl AsRef<std::path::Path>) -> Result<Self, StorageError> {
|
|
||||||
let db = Database::create(path).map_err(backend)?;
|
|
||||||
Ok(Self { db })
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Storage for RedbStorage {
|
|
||||||
fn create_relation(&mut self, name: &str, arity: usize) -> Result<(), StorageError> {
|
|
||||||
if name == META_TABLE {
|
|
||||||
return Err(StorageError::Validation(format!(
|
|
||||||
"relation name '{name}' is reserved"
|
|
||||||
)));
|
|
||||||
}
|
|
||||||
let arity_u32 = u32::try_from(arity)
|
|
||||||
.map_err(|_| StorageError::Validation(format!("arity {arity} exceeds u32 range")))?;
|
|
||||||
let txn = self.db.begin_write().map_err(backend)?;
|
|
||||||
{
|
|
||||||
let mut meta = txn.open_table(meta_def()).map_err(backend)?;
|
|
||||||
if meta.get(name).map_err(backend)?.is_some() {
|
|
||||||
return Err(StorageError::RelationExists(name.to_string()));
|
|
||||||
}
|
|
||||||
let encoded = encode_meta(arity_u32, 0);
|
|
||||||
meta.insert(name, &encoded[..]).map_err(backend)?;
|
|
||||||
// open_table creates the rows table if it does not exist
|
|
||||||
let _ = txn.open_table(rows_def(name)).map_err(backend)?;
|
|
||||||
}
|
|
||||||
txn.commit().map_err(backend)?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn arity(&self, name: &str) -> Result<usize, StorageError> {
|
|
||||||
let txn = self.db.begin_read().map_err(backend)?;
|
|
||||||
let meta = txn.open_table(meta_def()).map_err(backend)?;
|
|
||||||
let raw = meta
|
|
||||||
.get(name)
|
|
||||||
.map_err(backend)?
|
|
||||||
.ok_or_else(|| StorageError::RelationNotFound(name.to_string()))?;
|
|
||||||
let (arity, _) = decode_meta(raw.value())?;
|
|
||||||
Ok(arity as usize)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn scan(&self, name: &str) -> Result<Vec<Vec<Value>>, StorageError> {
|
|
||||||
let txn = self.db.begin_read().map_err(backend)?;
|
|
||||||
let meta = txn.open_table(meta_def()).map_err(backend)?;
|
|
||||||
if meta.get(name).map_err(backend)?.is_none() {
|
|
||||||
return Err(StorageError::RelationNotFound(name.to_string()));
|
|
||||||
}
|
|
||||||
let table = txn.open_table(rows_def(name)).map_err(backend)?;
|
|
||||||
let mut rows = Vec::new();
|
|
||||||
for entry in table.iter().map_err(backend)? {
|
|
||||||
let (_, value) = entry.map_err(backend)?;
|
|
||||||
rows.push(decode_row(value.value())?);
|
|
||||||
}
|
|
||||||
Ok(rows)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn insert(&mut self, name: &str, row: Vec<Value>) -> Result<(), StorageError> {
|
|
||||||
let txn = self.db.begin_write().map_err(backend)?;
|
|
||||||
let (arity, next_id) = {
|
|
||||||
let meta = txn.open_table(meta_def()).map_err(backend)?;
|
|
||||||
let entry = meta
|
|
||||||
.get(name)
|
|
||||||
.map_err(backend)?
|
|
||||||
.ok_or_else(|| StorageError::RelationNotFound(name.to_string()))?;
|
|
||||||
decode_meta(entry.value())?
|
|
||||||
};
|
|
||||||
if row.len() != arity as usize {
|
|
||||||
return Err(StorageError::ArityMismatch {
|
|
||||||
expected: arity as usize,
|
|
||||||
got: row.len(),
|
|
||||||
});
|
|
||||||
}
|
|
||||||
{
|
|
||||||
let mut rows = txn.open_table(rows_def(name)).map_err(backend)?;
|
|
||||||
let encoded = encode_row(&row);
|
|
||||||
rows.insert(next_id, &encoded[..]).map_err(backend)?;
|
|
||||||
}
|
|
||||||
{
|
|
||||||
let mut meta = txn.open_table(meta_def()).map_err(backend)?;
|
|
||||||
let new_meta = encode_meta(arity, next_id + 1);
|
|
||||||
meta.insert(name, new_meta.as_ref()).map_err(backend)?;
|
|
||||||
}
|
|
||||||
txn.commit().map_err(backend)?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use super::*;
|
|
||||||
|
|
||||||
fn i(x: i64) -> Value {
|
|
||||||
Value::Int(x)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn s(x: &str) -> Value {
|
|
||||||
Value::Str(x.to_string())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn open_temp() -> Result<RedbStorage, StorageError> {
|
|
||||||
let dir = tempfile::tempdir().map_err(backend)?;
|
|
||||||
// The file does not have to exist for redb::create.
|
|
||||||
let path = dir.path().join("test.redb");
|
|
||||||
let storage = RedbStorage::open(&path)?;
|
|
||||||
// Keep the tempdir alive by leaking it (test-only).
|
|
||||||
std::mem::forget(dir);
|
|
||||||
Ok(storage)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn create_insert_scan_roundtrip() -> Result<(), StorageError> {
|
|
||||||
let mut storage = open_temp()?;
|
|
||||||
storage.create_relation("edge", 2)?;
|
|
||||||
storage.insert("edge", vec![i(1), i(2)])?;
|
|
||||||
storage.insert("edge", vec![s("hello"), i(7)])?;
|
|
||||||
let rows = storage.scan("edge")?;
|
|
||||||
assert_eq!(rows, vec![vec![i(1), i(2)], vec![s("hello"), i(7)]]);
|
|
||||||
assert_eq!(storage.arity("edge")?, 2);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn duplicate_create_returns_err() -> Result<(), StorageError> {
|
|
||||||
let mut storage = open_temp()?;
|
|
||||||
storage.create_relation("edge", 2)?;
|
|
||||||
assert!(matches!(
|
|
||||||
storage.create_relation("edge", 2),
|
|
||||||
Err(StorageError::RelationExists(_))
|
|
||||||
));
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn insert_wrong_arity_returns_err() -> Result<(), StorageError> {
|
|
||||||
let mut storage = open_temp()?;
|
|
||||||
storage.create_relation("edge", 2)?;
|
|
||||||
assert!(matches!(
|
|
||||||
storage.insert("edge", vec![i(1)]),
|
|
||||||
Err(StorageError::ArityMismatch {
|
|
||||||
expected: 2,
|
|
||||||
got: 1,
|
|
||||||
})
|
|
||||||
));
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,161 +0,0 @@
|
|||||||
//! Sled adapter.
|
|
||||||
//!
|
|
||||||
//! Maps each relation onto a sled [`Tree`](sled::Tree) of the same name. A
|
|
||||||
//! reserved tree named `__meta` carries per-relation metadata (arity and the
|
|
||||||
//! next synthetic row ID).
|
|
||||||
|
|
||||||
use crate::value::Value;
|
|
||||||
|
|
||||||
use crate::codec::{decode_meta, decode_row, encode_meta, encode_row, row_key};
|
|
||||||
use crate::{Storage, StorageError};
|
|
||||||
|
|
||||||
const META_TREE: &str = "__meta";
|
|
||||||
|
|
||||||
fn backend<E: std::error::Error + Send + Sync + 'static>(err: E) -> StorageError {
|
|
||||||
StorageError::Backend(Box::new(err))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Sled-backed [`Storage`] implementation.
|
|
||||||
pub struct SledStorage {
|
|
||||||
db: sled::Db,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl SledStorage {
|
|
||||||
/// Open or create a sled database at `path`.
|
|
||||||
///
|
|
||||||
/// # Errors
|
|
||||||
/// Returns [`StorageError::Backend`] if sled fails to open the path.
|
|
||||||
pub fn open(path: impl AsRef<std::path::Path>) -> Result<Self, StorageError> {
|
|
||||||
let db = sled::open(path).map_err(backend)?;
|
|
||||||
Ok(Self { db })
|
|
||||||
}
|
|
||||||
|
|
||||||
fn meta_tree(&self) -> Result<sled::Tree, StorageError> {
|
|
||||||
self.db.open_tree(META_TREE).map_err(backend)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn relation_tree(&self, name: &str) -> Result<sled::Tree, StorageError> {
|
|
||||||
self.db.open_tree(name).map_err(backend)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn load_meta(&self, name: &str) -> Result<(u32, u64), StorageError> {
|
|
||||||
let meta = self.meta_tree()?;
|
|
||||||
let raw = meta
|
|
||||||
.get(name.as_bytes())
|
|
||||||
.map_err(backend)?
|
|
||||||
.ok_or_else(|| StorageError::RelationNotFound(name.to_string()))?;
|
|
||||||
Ok(decode_meta(raw.as_ref())?)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn store_meta(&self, name: &str, arity: u32, next_id: u64) -> Result<(), StorageError> {
|
|
||||||
let meta = self.meta_tree()?;
|
|
||||||
let encoded = encode_meta(arity, next_id);
|
|
||||||
meta.insert(name.as_bytes(), encoded.as_ref())
|
|
||||||
.map_err(backend)?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Storage for SledStorage {
|
|
||||||
fn create_relation(&mut self, name: &str, arity: usize) -> Result<(), StorageError> {
|
|
||||||
if name == META_TREE {
|
|
||||||
return Err(StorageError::Validation(format!(
|
|
||||||
"relation name '{name}' is reserved"
|
|
||||||
)));
|
|
||||||
}
|
|
||||||
let meta = self.meta_tree()?;
|
|
||||||
if meta.contains_key(name.as_bytes()).map_err(backend)? {
|
|
||||||
return Err(StorageError::RelationExists(name.to_string()));
|
|
||||||
}
|
|
||||||
let arity_u32 = u32::try_from(arity)
|
|
||||||
.map_err(|_| StorageError::Validation(format!("arity {arity} exceeds u32 range")))?;
|
|
||||||
self.store_meta(name, arity_u32, 0)?;
|
|
||||||
// open_tree creates the tree if it doesn't exist
|
|
||||||
let _ = self.relation_tree(name)?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn arity(&self, name: &str) -> Result<usize, StorageError> {
|
|
||||||
let (arity, _) = self.load_meta(name)?;
|
|
||||||
Ok(arity as usize)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn scan(&self, name: &str) -> Result<Vec<Vec<Value>>, StorageError> {
|
|
||||||
let _ = self.load_meta(name)?;
|
|
||||||
let tree = self.relation_tree(name)?;
|
|
||||||
let mut rows = Vec::new();
|
|
||||||
for entry in &tree {
|
|
||||||
let (_, value) = entry.map_err(backend)?;
|
|
||||||
rows.push(decode_row(value.as_ref())?);
|
|
||||||
}
|
|
||||||
Ok(rows)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn insert(&mut self, name: &str, row: Vec<Value>) -> Result<(), StorageError> {
|
|
||||||
let (arity, next_id) = self.load_meta(name)?;
|
|
||||||
if row.len() != arity as usize {
|
|
||||||
return Err(StorageError::ArityMismatch {
|
|
||||||
expected: arity as usize,
|
|
||||||
got: row.len(),
|
|
||||||
});
|
|
||||||
}
|
|
||||||
let tree = self.relation_tree(name)?;
|
|
||||||
tree.insert(row_key(next_id), encode_row(&row))
|
|
||||||
.map_err(backend)?;
|
|
||||||
self.store_meta(name, arity, next_id + 1)?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use super::*;
|
|
||||||
|
|
||||||
fn i(x: i64) -> Value {
|
|
||||||
Value::Int(x)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn create_insert_scan_roundtrip() -> Result<(), StorageError> {
|
|
||||||
let dir = tempfile::tempdir().map_err(backend)?;
|
|
||||||
let mut storage = SledStorage::open(dir.path())?;
|
|
||||||
storage.create_relation("edge", 2)?;
|
|
||||||
storage.insert("edge", vec![i(1), i(2)])?;
|
|
||||||
storage.insert("edge", vec![i(2), i(3)])?;
|
|
||||||
storage.insert("edge", vec![i(3), i(3)])?;
|
|
||||||
let rows = storage.scan("edge")?;
|
|
||||||
assert_eq!(
|
|
||||||
rows,
|
|
||||||
vec![vec![i(1), i(2)], vec![i(2), i(3)], vec![i(3), i(3)],],
|
|
||||||
);
|
|
||||||
assert_eq!(storage.arity("edge")?, 2);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn duplicate_create_returns_err() -> Result<(), StorageError> {
|
|
||||||
let dir = tempfile::tempdir().map_err(backend)?;
|
|
||||||
let mut storage = SledStorage::open(dir.path())?;
|
|
||||||
storage.create_relation("edge", 2)?;
|
|
||||||
assert!(matches!(
|
|
||||||
storage.create_relation("edge", 2),
|
|
||||||
Err(StorageError::RelationExists(_))
|
|
||||||
));
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn insert_wrong_arity_returns_err() -> Result<(), StorageError> {
|
|
||||||
let dir = tempfile::tempdir().map_err(backend)?;
|
|
||||||
let mut storage = SledStorage::open(dir.path())?;
|
|
||||||
storage.create_relation("edge", 2)?;
|
|
||||||
assert!(matches!(
|
|
||||||
storage.insert("edge", vec![i(1)]),
|
|
||||||
Err(StorageError::ArityMismatch {
|
|
||||||
expected: 2,
|
|
||||||
got: 1,
|
|
||||||
})
|
|
||||||
));
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Loading…
x
Reference in New Issue
Block a user