Compare commits

..

No commits in common. "4b866067a488dca746cec61dac56968c515b2d71" and "4e055f34e679c058dbcc14876992a9802c9dfa64" have entirely different histories.

31 changed files with 24 additions and 2215 deletions

View File

@ -8,7 +8,7 @@ indent_size = 4
insert_final_newline = true
trim_trailing_whitespace = true
[*.{rs,hs,py}]
[*.rs]
max_line_length = 100
[*.md]

1
.gitignore vendored
View File

@ -81,4 +81,3 @@ tarpaulin-report.html
.claude/
.codex
.agents/
dist-newstyle/

4
.gitmodules vendored
View File

@ -2,7 +2,3 @@
path = external/geomerge
url = gitlab@git.sgai.uk:vincent_liu/geomerge.git
branch = main
[submodule "external/geolog"]
path = external/geolog
url = gitlab@git.sgai.uk:creators/geolog.git
branch = query-plan-algebraic

View File

@ -50,7 +50,7 @@ Expected durable areas may include:
- `src/`: Rust source for parser, catalog, planner, execution experiments, and storage prototypes.
- `tests/`: integration tests for rule planning, evaluation, and storage behavior.
- `tools/exporter/examples/`: hand-authored scenario JSON consumed by the Haskell exporter to produce runner fixtures.
- `examples/`: small runnable Datalog-like programs or storage scenarios.
- `fixtures/`: committed input facts and expected outputs.
- `notes/`: local design notes that belong to this project.
- `flowlog/`: project-local notes or sketches derived from the FlowLog line of work.

11
Cargo.lock generated
View File

@ -1146,17 +1146,6 @@ version = "0.3.33"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19f132c84eca552bf34cab8ec81f1c1dcc229b811638f9d283dceabe58c5569e"
[[package]]
name = "plan-runner"
version = "0.1.0"
dependencies = [
"query-ops",
"serde",
"serde_json",
"storage",
"tempfile",
]
[[package]]
name = "plotters"
version = "0.3.7"

View File

@ -76,29 +76,6 @@ clean: ## Remove build output
cargo clean; \
fi
EXPORTER_DIR := tools/exporter
EXPORTER_FIXTURES := crates/plan-runner/fixtures
EXAMPLES_DIR := $(EXPORTER_DIR)/examples
.PHONY: export-fixtures
export-fixtures: ## Regenerate plan JSON for every tools/exporter/examples/*.scenario.json (needs Cabal and GHC; use `make shell` first).
@if ! command -v cabal >/dev/null 2>&1; then \
echo "cabal not found. Enter the dev shell with 'make shell' (or 'nix develop') first."; \
exit 1; \
fi
@cd $(EXPORTER_DIR) && cabal build plan-export
@mkdir -p $(EXPORTER_FIXTURES)
@for sc in $(EXAMPLES_DIR)/*.scenario.json; do \
base=$$(basename $$sc .scenario.json); \
out=$(EXPORTER_FIXTURES)/$$base.json; \
echo "exporting $$sc -> $$out"; \
(cd $(EXPORTER_DIR) && cabal run -v0 plan-export -- examples/$$base.scenario.json) > $$out; \
done
.PHONY: examples
examples: export-fixtures ## Regenerate fixtures from scenarios and run them through plan-runner against their oracles.
@cargo test -p plan-runner --test examples
.PHONY: shell
shell: ## Enter the Nix dev shell defined in flake.nix
@nix develop

View File

@ -1,26 +0,0 @@
[package]
name = "plan-runner"
version = "0.1.0"
edition.workspace = true
license.workspace = true
rust-version.workspace = true
[lints]
workspace = true
[dependencies]
query-ops = { path = "../query-ops" }
storage = { path = "../storage", features = [
"lmdb",
"redb",
"fjall",
"sqlite",
"geomerge",
] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
tempfile = "3"
[[bin]]
name = "plan-run"
path = "src/main.rs"

View File

@ -1,122 +0,0 @@
## Plan Runner
This crate is a snapshot executor for conjunctive-query plans.
It reads a JSON plan (a DAG of scan and join nodes plus the input facts),
walks the DAG using the operators from [`query-ops`](../query-ops),
and prints the binding relation produced at the root node.
The wire format mirrors `Geolog.DB.Plan.PlanGraph` from the
[`geolog`](../../external/geolog) submodule, but the JSON shape is the contract:
any frontend that emits this format can drive the runner.
The mapping from `PlanEvalAtom` / `PlanJoin` to `scan_atom` / `semijoin` / `natural_join`,
and the full IR spec, are documented as module-level rustdoc in
[`src/lib.rs`](src/lib.rs).
### Pipeline
End-to-end, scenarios become runner output through three stages:
```text
tools/exporter/examples/*.scenario.json
└── (Haskell exporter; runs Geolog.DB.Plan.planConjunction
and Geolog.DB.InMemory.evalConjunctionPlanned as a self-check)
└── crates/plan-runner/fixtures/*.json (JSON IR; checked in)
└── (plan-runner; this crate)
└── stdout JSON, with row-for-row oracle check
```
The exporter (`tools/exporter`) is the only producer of runner IR today;
it's where atoms are planned and rejected if they don't fit the supported subset.
Fixtures are regenerated with `make export-fixtures`, and the full loop is `make examples`.
### Backends
The CLI takes a `--backend` flag.
The `memory` backend is the pure in-memory path;
every other backend routes facts through the [`Storage`](../storage) trait
via `build_tables_via_storage`, then scans tables back out before executing.
| Backend | Storage | Location |
|------------------|--------------------------------------------------|--------------------------------|
| `memory` | none (direct from `plan.facts`) | n/a |
| `memory-storage` | `MemoryStorage` | in-process |
| `lmdb` | `LmdbStorage` (heed-backed mmap B-tree) | fresh tempdir per run |
| `redb` | `RedbStorage` (single-file B-tree) | fresh tempdir per run |
| `fjall` | `FjallStorage` (LSM tree) | fresh tempdir per run |
| `sqlite` | `SqliteStorage` (rusqlite, bundled libsqlite3) | fresh tempdir per run |
| `geomerge` | `GeomergeStorage` (CRDT; alpha) | in-process |
All seven produce byte-identical output for every checked-in fixture.
The point of the abstraction is not performance comparison
(the snapshot evaluator is bulk-materialized either way),
but to validate that the storage layer is genuinely backend-neutral
and that adding a new adapter is a constructor swap.
Note on `geomerge`:
the runner's JSON IR is untyped (only arity per relation),
but geomerge requires a typed theory upfront.
The CLI infers column types from the first fact row per relation
and synthesizes a theory of `PrimInt` and `PrimString` columns via
[`GeomergeStorage::with_relations`](../storage/src/adapters/geomerge.rs).
Columns with no sample facts default to `PrimString`.
### Run It
```sh
# Run one fixture through the default in-memory path:
cargo run -p plan-runner -- crates/plan-runner/fixtures/two_atom_join.json
# Same plan, routed through different backends:
cargo run -p plan-runner -- --backend memory-storage crates/plan-runner/fixtures/two_atom_join.json
cargo run -p plan-runner -- --backend lmdb crates/plan-runner/fixtures/two_atom_join.json
cargo run -p plan-runner -- --backend redb crates/plan-runner/fixtures/two_atom_join.json
cargo run -p plan-runner -- --backend fjall crates/plan-runner/fixtures/two_atom_join.json
cargo run -p plan-runner -- --backend sqlite crates/plan-runner/fixtures/two_atom_join.json
cargo run -p plan-runner -- --backend geomerge crates/plan-runner/fixtures/two_atom_join.json
# Regenerate every fixture from its scenario and run the oracle test:
make examples
```
A sample run:
```sh
$ plan-run crates/plan-runner/fixtures/two_atom_join.json
{"columns":["a","b","_w0_2"],"rows":[["node:1","node:2","edge:1"],["node:2","node:1","edge:2"]]}
```
The `_w<atomIdx>_<pos>` columns are wildcards the exporter named so the runner can bind them.
The scenario's `expected_bindings` block names only the variables the test cares about,
and `verify` projects the runner output to that subset before comparing as a multiset.
### Run the Tests
```sh
cargo test -p plan-runner
```
The two integration test files exercise complementary properties:
- `tests/examples.rs` walks every fixture and checks it against its `expected_bindings` oracle.
- `tests/storage_roundtrip.rs` cross-checks the pure path against the storage-backed path,
to keep `build_tables` and `build_tables_via_storage` in lockstep.
### Notes
- **IR contract.**
The runner is backend-agnostic and frontend-agnostic:
it consumes JSON in the shape documented in `src/lib.rs` and produces a binding relation.
Anything that emits the same JSON can drive it.
- **No optimizer.**
Plans are executed as written.
Node ordering, join shape, and antijoin scheduling are all the producer's responsibility.
This crate's job ends at faithful execution of the IR.
- **Wildcard columns survive.**
`scan_atom` keeps every distinct variable that appears in the pattern,
including the exporter's synthetic `_w<atomIdx>_<pos>` names.
The runner does not project them out;
oracle verification handles that on the comparison side.
- **Bulk, not streaming.**
Each node materializes its full output as a `Relation`.
This matches `query-ops`' execution model;
it's not designed for incremental or maintained-view workloads.

View File

@ -1,114 +0,0 @@
{
"_scenario": "cartesian",
"expected_bindings": {
"columns": [
"a",
"b"
],
"rows": [
[
{
"str": "left:1"
},
{
"str": "right:10"
}
],
[
{
"str": "left:1"
},
{
"str": "right:20"
}
],
[
{
"str": "left:2"
},
{
"str": "right:10"
}
],
[
{
"str": "left:2"
},
{
"str": "right:20"
}
]
]
},
"facts": {
"left": [
[
{
"str": "left:1"
}
],
[
{
"str": "left:2"
}
]
],
"right": [
[
{
"str": "right:10"
}
],
[
{
"str": "right:20"
}
]
]
},
"query": {
"nodes": [
{
"action": {
"scan": {
"columns": [
{
"var": "a"
}
],
"table": "left"
}
},
"id": 1
},
{
"action": {
"scan": {
"columns": [
{
"var": "b"
}
],
"table": "right"
}
},
"id": 2
},
{
"action": {
"join": {
"left": 1,
"op": "natural",
"right": 2
}
},
"id": 3
}
],
"root": 3
},
"schema": {
"left": 1,
"right": 1
}
}

View File

@ -1,84 +0,0 @@
{
"_scenario": "self-loop",
"expected_bindings": {
"columns": [
"x"
],
"rows": [
[
{
"str": "node:2"
}
],
[
{
"str": "node:3"
}
]
]
},
"facts": {
"edge": [
[
{
"str": "node:1"
},
{
"str": "node:2"
},
{
"str": "edge:1"
}
],
[
{
"str": "node:2"
},
{
"str": "node:2"
},
{
"str": "edge:2"
}
],
[
{
"str": "node:3"
},
{
"str": "node:3"
},
{
"str": "edge:3"
}
]
]
},
"query": {
"nodes": [
{
"action": {
"scan": {
"columns": [
{
"var": "x"
},
{
"var": "x"
},
{
"var": "_w0_2"
}
],
"table": "edge"
}
},
"id": 1
}
],
"root": 1
},
"schema": {
"edge": 3
}
}

View File

@ -1,186 +0,0 @@
{
"_scenario": "three-atom-chain",
"expected_bindings": {
"columns": [
"a",
"b",
"c"
],
"rows": [
[
{
"str": "node:1"
},
{
"str": "node:2"
},
{
"str": "node:3"
}
]
]
},
"facts": {
"edge": [
[
{
"str": "node:1"
},
{
"str": "node:2"
},
{
"str": "edge:1"
}
],
[
{
"str": "node:2"
},
{
"str": "node:3"
},
{
"str": "edge:2"
}
]
],
"node": [
[
{
"str": "node:1"
}
],
[
{
"str": "node:2"
}
],
[
{
"str": "node:3"
}
]
]
},
"query": {
"nodes": [
{
"action": {
"scan": {
"columns": [
{
"var": "a"
},
{
"var": "b"
},
{
"var": "_w0_2"
}
],
"table": "edge"
}
},
"id": 1
},
{
"action": {
"scan": {
"columns": [
{
"var": "b"
},
{
"var": "c"
},
{
"var": "_w1_2"
}
],
"table": "edge"
}
},
"id": 2
},
{
"action": {
"scan": {
"columns": [
{
"var": "a"
}
],
"table": "node"
}
},
"id": 3
},
{
"action": {
"join": {
"left": 1,
"op": "left",
"right": 3
}
},
"id": 4
},
{
"action": {
"join": {
"left": 2,
"op": "left",
"right": 4
}
},
"id": 5
},
{
"action": {
"join": {
"left": 5,
"op": "right",
"right": 4
}
},
"id": 6
},
{
"action": {
"join": {
"left": 6,
"op": "right",
"right": 3
}
},
"id": 7
},
{
"action": {
"join": {
"left": 6,
"op": "natural",
"right": 7
}
},
"id": 8
},
{
"action": {
"join": {
"left": 5,
"op": "natural",
"right": 8
}
},
"id": 9
}
],
"root": 9
},
"schema": {
"edge": 3,
"node": 1
}
}

View File

@ -1,136 +0,0 @@
{
"_scenario": "two-atom-join",
"expected_bindings": {
"columns": [
"a",
"b"
],
"rows": [
[
{
"str": "node:1"
},
{
"str": "node:2"
}
],
[
{
"str": "node:2"
},
{
"str": "node:1"
}
]
]
},
"facts": {
"edge": [
[
{
"str": "node:1"
},
{
"str": "node:2"
},
{
"str": "edge:1"
}
],
[
{
"str": "node:2"
},
{
"str": "node:1"
},
{
"str": "edge:2"
}
]
],
"node": [
[
{
"str": "node:1"
}
],
[
{
"str": "node:2"
}
]
]
},
"query": {
"nodes": [
{
"action": {
"scan": {
"columns": [
{
"var": "a"
},
{
"var": "b"
},
{
"var": "_w0_2"
}
],
"table": "edge"
}
},
"id": 1
},
{
"action": {
"scan": {
"columns": [
{
"var": "a"
}
],
"table": "node"
}
},
"id": 2
},
{
"action": {
"join": {
"left": 1,
"op": "left",
"right": 2
}
},
"id": 3
},
{
"action": {
"join": {
"left": 3,
"op": "right",
"right": 2
}
},
"id": 4
},
{
"action": {
"join": {
"left": 3,
"op": "natural",
"right": 4
}
},
"id": 5
}
],
"root": 5
},
"schema": {
"edge": 3,
"node": 1
}
}

View File

@ -1,540 +0,0 @@
//! Snapshot executor for conjunctive-query plans.
//!
//! Takes a structural plan (a DAG of `Scan` and `Join` nodes), the input
//! tables it scans, and walks the DAG via [`query_ops::atom::scan_atom`],
//! [`query_ops::join::semijoin`], and [`query_ops::join::natural_join`].
//! The result is a binding [`Relation`](query_ops::relation::Relation) over
//! the query's variables.
//!
//! The runner is intentionally backend-agnostic: it depends only on
//! `query-ops`, and the planner that emits the JSON IR is decoupled from
//! the storage backend that produced the facts. To execute a plan against
//! a [`Storage`](storage::Storage) backend, materialize each input table
//! with [`storage::scan_as_table`] and call [`execute`] with the resulting
//! map. The in-tree `tests/storage_roundtrip.rs` is the canonical example.
//!
//! The JSON IR mirrors `Geolog.DB.Plan.PlanGraph` and
//! `Geolog.DB.InMemory.QAtom` from the `external/geolog` submodule, but the
//! shape is the contract: any frontend that emits this JSON can use the
//! runner.
//!
//! Operator mapping from the Haskell planner:
//!
//! | `Geolog.DB.Plan` | this crate |
//! |-----------------------------|-----------------------------------------------|
//! | `PlanEvalAtom` | [`Action::Scan`] → `scan_atom` |
//! | `PlanJoin LeftJoin a b` | [`Action::Join`] with [`JoinOp::Left`] → `semijoin(rel[a], rel[b])` |
//! | `PlanJoin RightJoin a b` | [`Action::Join`] with [`JoinOp::Right`] → `semijoin(rel[b], rel[a])` |
//! | `PlanJoin NaturalJoin a b` | [`Action::Join`] with [`JoinOp::Natural`] → `natural_join(rel[a], rel[b])` |
//!
//! The atom side covers `evalAtom` (`Geolog.DB.InMemory`): a [`Term::Var`]
//! repeated across positions enforces equality, [`Term::Lit`] filters by
//! constant, and distinct variables project in first-occurrence order.
use std::collections::HashMap;
use serde::Deserialize;
use query_ops::atom::{AtomPattern, Term, scan_atom};
use query_ops::join::{natural_join, semijoin};
use query_ops::relation::Relation;
use storage::table::Table;
use storage::value::Value;
use storage::{Storage, StorageError, scan_as_table};
/// A single fixture: schema, ground facts, and a query plan to execute.
#[derive(Debug, Clone, Deserialize)]
pub struct Plan {
/// Relation name → arity (column count).
pub schema: HashMap<String, usize>,
/// Relation name → list of ground tuples to insert before execution.
pub facts: HashMap<String, Vec<Vec<JsonValue>>>,
/// The join DAG itself.
pub query: Query,
/// Optional oracle: if present, [`verify`] cross-checks an executed
/// [`Relation`] against this projection. The exporter lifts the
/// scenario's `expected_bindings` block into this field.
#[serde(default)]
pub expected_bindings: Option<ExpectedBindings>,
}
/// Expected query result, projected to a named subset of variables. The
/// columns named here must all appear in the runner's output; any extra
/// columns (typically per-atom wildcards) are ignored.
#[derive(Debug, Clone, Deserialize)]
pub struct ExpectedBindings {
pub columns: Vec<String>,
pub rows: Vec<Vec<JsonValue>>,
}
/// Mirrors `Geolog.DB.Plan.PlanGraph`: a set of nodes plus the id of the
/// rooted result node (the last node in topological order).
#[derive(Debug, Clone, Deserialize)]
pub struct Query {
pub root: u32,
pub nodes: Vec<Node>,
}
/// One node of the plan DAG. `id`s don't need to start at any particular
/// value, mirroring the Haskell `PlanNodeId`.
#[derive(Debug, Clone, Deserialize)]
pub struct Node {
pub id: u32,
pub action: Action,
}
/// What to compute at a node. Tagged externally so JSON reads as
/// `{"action": {"scan": {...}}}` or `{"action": {"join": {...}}}`.
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum Action {
Scan(Atom),
Join(Join),
}
/// A flat atom pattern, one entry per column of the target relation.
/// Matches the `toFlatArgs` view used by `Geolog.DB.InMemory.evalAtom`:
/// `qaValues` positions are filled in directly, and the entity-id column
/// (if any) appears at the last position. Wildcard positions in the
/// Haskell `QAtom` (a `Map Int QVal` with a missing key) translate to a
/// fresh, unique variable name on this side, which the operator binds but
/// never joins against.
#[derive(Debug, Clone, Deserialize)]
pub struct Atom {
pub table: String,
pub columns: Vec<JsonTerm>,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum JsonTerm {
Var(String),
Lit(JsonValue),
}
/// Wire-level value tag. Restricted to what
/// [`storage::value::Value`](storage::value::Value) carries. Entity identities from
/// the Haskell side (`ValEntity path id`) round-trip through `Str` using a
/// `"path:id"` convention; that's a fixture concern, not a runner concern.
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum JsonValue {
Int(i64),
Str(String),
}
#[derive(Debug, Clone, Deserialize)]
pub struct Join {
pub op: JoinOp,
pub left: u32,
pub right: u32,
}
#[derive(Debug, Clone, Copy, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum JoinOp {
/// `Geolog.DB.Plan.LeftJoin`: result is `left` rows whose shared columns
/// appear in `right`. Lowered to `semijoin(left, right)`.
Left,
/// `Geolog.DB.Plan.RightJoin`: result is `right` rows whose shared
/// columns appear in `left`. Lowered to `semijoin(right, left)`.
Right,
/// `Geolog.DB.Plan.NaturalJoin`. Lowered to `natural_join(left, right)`.
Natural,
}
/// Errors produced by [`verify`] when actual bindings don't match the
/// scenario's `expected_bindings` projection.
#[derive(Debug)]
pub enum VerifyError {
/// An expected column wasn't produced by the plan.
MissingColumn(String),
/// An expected row's width didn't match the column count.
ExpectedRowArity { expected: usize, got: usize },
/// The expected and actual rows (after projection) differ as multisets.
BindingsMismatch {
expected: Vec<Vec<Value>>,
actual: Vec<Vec<Value>>,
},
}
impl std::fmt::Display for VerifyError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::MissingColumn(name) => {
write!(f, "expected column {name:?} not in plan output")
}
Self::ExpectedRowArity { expected, got } => write!(
f,
"expected row has {got} cells but columns has {expected} entries"
),
Self::BindingsMismatch { expected, actual } => write!(
f,
"bindings mismatch:\n expected: {expected:?}\n actual: {actual:?}"
),
}
}
}
impl std::error::Error for VerifyError {}
/// Cross-check an executed [`Relation`] against a [`Plan`]'s
/// `expected_bindings`. Projects `actual` to the expected columns (so the
/// runner is free to surface wildcard columns the oracle doesn't name) and
/// compares as a multiset.
///
/// Returns `Ok(true)` if the plan carried an oracle and it matched,
/// `Ok(false)` if there was no oracle (caller decides whether that's an
/// error). Returns [`VerifyError`] on mismatch.
///
/// # Errors
/// See [`VerifyError`].
pub fn verify(plan: &Plan, actual: &Relation) -> Result<bool, VerifyError> {
let Some(expected) = &plan.expected_bindings else {
return Ok(false);
};
let mut projection: Vec<usize> = Vec::with_capacity(expected.columns.len());
for col in &expected.columns {
let idx = actual
.columns
.iter()
.position(|c| c == col)
.ok_or_else(|| VerifyError::MissingColumn(col.clone()))?;
projection.push(idx);
}
let mut actual_proj: Vec<Vec<Value>> = actual
.rows
.iter()
.map(|row| projection.iter().map(|&i| row[i].clone()).collect())
.collect();
let mut expected_proj: Vec<Vec<Value>> = Vec::with_capacity(expected.rows.len());
for row in &expected.rows {
if row.len() != expected.columns.len() {
return Err(VerifyError::ExpectedRowArity {
expected: expected.columns.len(),
got: row.len(),
});
}
expected_proj.push(row.iter().cloned().map(Value::from).collect());
}
// Value is not Ord; use Debug-derived sort keys to compare as a multiset.
let key = |row: &[Value]| -> String { format!("{row:?}") };
actual_proj.sort_by_key(|r| key(r));
expected_proj.sort_by_key(|r| key(r));
if actual_proj == expected_proj {
Ok(true)
} else {
Err(VerifyError::BindingsMismatch {
expected: expected_proj,
actual: actual_proj,
})
}
}
/// Errors a runner can produce during plan validation and execution.
#[derive(Debug)]
pub enum RunError {
/// A fact or scan references a relation that isn't declared in `schema`.
UnknownRelation(String),
/// A scan refers to a table that wasn't supplied in the input map.
MissingTable(String),
/// A fact row's length doesn't match the schema's declared arity.
ArityMismatch {
relation: String,
expected: usize,
got: usize,
},
/// A scan's atom pattern doesn't match the table's arity.
PatternArityMismatch {
table: String,
table_arity: usize,
pattern_arity: usize,
},
/// A join node references a node id that doesn't exist.
MissingNode(u32),
/// `Query.root` doesn't match any node in `nodes`.
MissingRoot(u32),
/// Two nodes share the same id.
DuplicateNode(u32),
/// A join node references its left or right side before that side has
/// been computed: the DAG isn't actually topologically sorted by id, or
/// it has a cycle.
UnresolvedDependency { node: u32, depends_on: u32 },
/// A [`Storage`] backend used to materialize tables returned an error.
Storage(StorageError),
}
impl From<StorageError> for RunError {
fn from(err: StorageError) -> Self {
Self::Storage(err)
}
}
impl std::fmt::Display for RunError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::UnknownRelation(name) => {
write!(f, "facts reference relation {name:?} not in schema")
}
Self::MissingTable(name) => write!(f, "scan references missing table {name:?}"),
Self::ArityMismatch {
relation,
expected,
got,
} => write!(
f,
"relation {relation:?}: row arity {got} differs from schema arity {expected}"
),
Self::PatternArityMismatch {
table,
table_arity,
pattern_arity,
} => write!(
f,
"scan of {table:?}: pattern has {pattern_arity} columns, table has {table_arity}"
),
Self::MissingNode(id) => write!(f, "plan references missing node id {id}"),
Self::MissingRoot(id) => write!(f, "plan root id {id} matches no node"),
Self::DuplicateNode(id) => write!(f, "duplicate node id {id} in plan"),
Self::UnresolvedDependency { node, depends_on } => write!(
f,
"node {node} depends on {depends_on}, which has not been computed yet"
),
Self::Storage(err) => write!(f, "storage backend error: {err}"),
}
}
}
impl std::error::Error for RunError {}
impl From<JsonValue> for Value {
fn from(jv: JsonValue) -> Self {
match jv {
JsonValue::Int(n) => Self::Int(n),
JsonValue::Str(s) => Self::Str(s),
}
}
}
impl From<JsonTerm> for Term {
fn from(t: JsonTerm) -> Self {
match t {
JsonTerm::Var(name) => Self::Var(name),
JsonTerm::Lit(value) => Self::Lit(value.into()),
}
}
}
/// Parse a [`Plan`] from a JSON string.
///
/// # Errors
/// Returns a [`serde_json::Error`] if the input isn't valid JSON in the
/// expected shape.
pub fn parse_plan(json: &str) -> Result<Plan, serde_json::Error> {
serde_json::from_str(json)
}
/// Build the input [`Table`] for each relation declared in a [`Plan`]'s
/// schema, populating rows from the plan's `facts` map. Relations with no
/// facts get an empty table at the declared arity.
///
/// # Errors
/// Returns [`RunError::UnknownRelation`] if `facts` mentions a relation
/// not in `schema`, or [`RunError::ArityMismatch`] if a row's width doesn't
/// match the declared arity.
pub fn build_tables(plan: &Plan) -> Result<HashMap<String, Table>, RunError> {
let mut tables: HashMap<String, Table> = plan
.schema
.iter()
.map(|(name, arity)| (name.clone(), Table::new(*arity)))
.collect();
for (name, rows) in &plan.facts {
let Some(table) = tables.get_mut(name) else {
return Err(RunError::UnknownRelation(name.clone()));
};
for row in rows {
if row.len() != table.arity {
return Err(RunError::ArityMismatch {
relation: name.clone(),
expected: table.arity,
got: row.len(),
});
}
let cells: Vec<Value> = row.iter().cloned().map(Value::from).collect();
table.push(cells);
}
}
Ok(tables)
}
/// Populate a [`Storage`] backend from a [`Plan`]'s schema and facts, then
/// materialize each declared relation back into an in-memory [`Table`] via
/// [`scan_as_table`]. The returned map is the same shape [`execute`]
/// consumes, so this is the storage-backed analogue of [`build_tables`].
///
/// Adding a new backend means constructing a different `S` at the call
/// site; the body here doesn't need to change.
///
/// # Errors
/// Returns [`RunError::UnknownRelation`] or [`RunError::ArityMismatch`] on
/// the same conditions as [`build_tables`], or [`RunError::Storage`] when
/// the backend itself rejects an operation.
pub fn build_tables_via_storage<S: Storage>(
plan: &Plan,
storage: &mut S,
) -> Result<HashMap<String, Table>, RunError> {
for (name, arity) in &plan.schema {
storage.create_relation(name, *arity)?;
}
{
let mut tx = storage.transaction()?;
for (name, rows) in &plan.facts {
let Some(&arity) = plan.schema.get(name) else {
return Err(RunError::UnknownRelation(name.clone()));
};
for row in rows {
if row.len() != arity {
return Err(RunError::ArityMismatch {
relation: name.clone(),
expected: arity,
got: row.len(),
});
}
let cells: Vec<Value> = row.iter().cloned().map(Value::from).collect();
tx.insert(name, cells)?;
}
}
tx.commit()?;
}
let mut tables: HashMap<String, Table> = HashMap::with_capacity(plan.schema.len());
for name in plan.schema.keys() {
let table = scan_as_table(storage as &dyn Storage, name)?;
tables.insert(name.clone(), table);
}
Ok(tables)
}
/// Execute a query DAG against the supplied input tables, returning the
/// bindings [`Relation`] for the rooted plan node.
///
/// Nodes are executed in ascending `id` order. For a Yannakakis plan as
/// emitted by `Geolog.DB.Plan` this is equivalent to a topological sort,
/// since `insertJoin` only references node ids that have already been
/// allocated. A non-monotone id ordering is rejected with
/// [`RunError::UnresolvedDependency`].
///
/// # Errors
/// Returns [`RunError::DuplicateNode`] for repeated ids,
/// [`RunError::MissingNode`] for join references to unknown ids,
/// [`RunError::MissingRoot`] if `query.root` isn't present,
/// [`RunError::MissingTable`] if a scan references a table not in the map,
/// or [`RunError::PatternArityMismatch`] if a scan's pattern doesn't match
/// the table's arity.
pub fn execute<S: std::hash::BuildHasher>(
tables: &HashMap<String, Table, S>,
query: &Query,
) -> Result<Relation, RunError> {
let mut seen_ids: std::collections::HashSet<u32> =
std::collections::HashSet::with_capacity(query.nodes.len());
for node in &query.nodes {
if !seen_ids.insert(node.id) {
return Err(RunError::DuplicateNode(node.id));
}
}
if !seen_ids.contains(&query.root) {
return Err(RunError::MissingRoot(query.root));
}
let mut ordered: Vec<&Node> = query.nodes.iter().collect();
ordered.sort_by_key(|n| n.id);
let mut results: HashMap<u32, Relation> = HashMap::with_capacity(ordered.len());
for node in ordered {
let computed = match &node.action {
Action::Scan(atom) => {
let table = tables
.get(&atom.table)
.ok_or_else(|| RunError::MissingTable(atom.table.clone()))?;
if atom.columns.len() != table.arity {
return Err(RunError::PatternArityMismatch {
table: atom.table.clone(),
table_arity: table.arity,
pattern_arity: atom.columns.len(),
});
}
let pattern = AtomPattern {
columns: atom.columns.iter().cloned().map(Term::from).collect(),
};
scan_atom(table, &pattern)
}
Action::Join(join) => {
let left = require_dep(&results, &seen_ids, node.id, join.left)?;
let right = require_dep(&results, &seen_ids, node.id, join.right)?;
match join.op {
JoinOp::Left => semijoin(left, right),
JoinOp::Right => semijoin(right, left),
JoinOp::Natural => natural_join(left, right),
}
}
};
results.insert(node.id, computed);
}
results
.remove(&query.root)
.ok_or(RunError::MissingRoot(query.root))
}
fn require_dep<'a>(
results: &'a HashMap<u32, Relation>,
seen: &std::collections::HashSet<u32>,
node: u32,
depends_on: u32,
) -> Result<&'a Relation, RunError> {
if let Some(rel) = results.get(&depends_on) {
Ok(rel)
} else if seen.contains(&depends_on) {
Err(RunError::UnresolvedDependency { node, depends_on })
} else {
Err(RunError::MissingNode(depends_on))
}
}
/// Convenience: parse JSON, build tables from the embedded facts, and
/// execute, returning the root binding relation. Equivalent to
/// `parse_plan` + [`build_tables`] + [`execute`].
///
/// # Errors
/// Returns a JSON parse error if the input is malformed, or a [`RunError`]
/// for any later step.
pub fn run_json(json: &str) -> Result<Relation, RunFromJsonError> {
let plan = parse_plan(json).map_err(RunFromJsonError::Parse)?;
let tables = build_tables(&plan).map_err(RunFromJsonError::Run)?;
let bindings = execute(&tables, &plan.query).map_err(RunFromJsonError::Run)?;
Ok(bindings)
}
/// Combined error from [`run_json`].
#[derive(Debug)]
pub enum RunFromJsonError {
Parse(serde_json::Error),
Run(RunError),
}
impl std::fmt::Display for RunFromJsonError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Parse(err) => write!(f, "parse error: {err}"),
Self::Run(err) => write!(f, "run error: {err}"),
}
}
}
impl std::error::Error for RunFromJsonError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::Parse(err) => Some(err),
Self::Run(err) => Some(err),
}
}
}

View File

@ -1,225 +0,0 @@
//! `plan-run` CLI: read a JSON plan from a file (or stdin if `-`), execute
//! it against the chosen backend, and print the resulting binding relation
//! as JSON on stdout.
//!
//! Backends:
//!
//! - `memory` (default): build tables straight from the plan's `facts`
//! block, no `Storage` trait involved. Pure in-memory path.
//! - `memory-storage`: load the same facts through `storage::MemoryStorage`
//! via the `Storage` trait, then materialize tables back out with
//! `scan_as_table` before executing.
//! - `lmdb`, `redb`, `fjall`, `sqlite`: file-backed `Storage` adapters.
//! Each invocation creates a fresh tempdir for the store and drops it on
//! exit; the runner is one-shot, so persistent paths aren't needed.
//! - `geomerge`: CRDT-backed adapter. Constructed in-memory; alpha-status
//! upstream.
use std::collections::HashMap;
use std::io::{self, Read};
use std::process::ExitCode;
use plan_runner::{JsonValue, Plan, build_tables, build_tables_via_storage, execute, parse_plan};
use storage::MemoryStorage;
use storage::adapters::fjall::FjallStorage;
use storage::adapters::geomerge::{ColumnKind, GeomergeStorage};
use storage::adapters::lmdb::LmdbStorage;
use storage::adapters::redb::RedbStorage;
use storage::adapters::sqlite::SqliteStorage;
use storage::table::Table;
use storage::value::Value;
use tempfile::TempDir;
#[derive(Debug, Clone, Copy)]
enum Backend {
Memory,
MemoryStorage,
Lmdb,
Redb,
Fjall,
Sqlite,
Geomerge,
}
impl Backend {
fn parse(s: &str) -> Option<Self> {
match s {
"memory" => Some(Self::Memory),
"memory-storage" => Some(Self::MemoryStorage),
"lmdb" => Some(Self::Lmdb),
"redb" => Some(Self::Redb),
"fjall" => Some(Self::Fjall),
"sqlite" => Some(Self::Sqlite),
"geomerge" => Some(Self::Geomerge),
_ => None,
}
}
}
const BACKENDS_HELP: &str = "memory|memory-storage|lmdb|redb|fjall|sqlite|geomerge";
fn main() -> ExitCode {
let mut backend = Backend::Memory;
let mut input_path: Option<String> = None;
let mut args = std::env::args().skip(1);
while let Some(arg) = args.next() {
match arg.as_str() {
"--backend" => {
let Some(value) = args.next() else {
eprintln!("--backend requires a value ({BACKENDS_HELP})");
return ExitCode::from(2);
};
let Some(parsed) = Backend::parse(&value) else {
eprintln!("unknown backend {value:?} (try {BACKENDS_HELP})");
return ExitCode::from(2);
};
backend = parsed;
}
other if input_path.is_none() => input_path = Some(other.to_string()),
other => {
eprintln!("unexpected argument: {other}");
return ExitCode::from(2);
}
}
}
let Some(path) = input_path else {
eprintln!("usage: plan-run [--backend {BACKENDS_HELP}] <plan.json | ->");
return ExitCode::from(2);
};
let input = match read_input(&path) {
Ok(s) => s,
Err(err) => {
eprintln!("failed to read {path}: {err}");
return ExitCode::from(1);
}
};
let plan = match parse_plan(&input) {
Ok(p) => p,
Err(err) => {
eprintln!("parse error: {err}");
return ExitCode::from(1);
}
};
let tables = match build_tables_for(&plan, backend) {
Ok(t) => t,
Err(err) => {
eprintln!("{err}");
return ExitCode::from(1);
}
};
let relation = match execute(&tables, &plan.query) {
Ok(r) => r,
Err(err) => {
eprintln!("execute error: {err}");
return ExitCode::from(1);
}
};
let payload = serde_json::json!({
"columns": relation.columns,
"rows": relation
.rows
.iter()
.map(|row| row.iter().map(value_to_json).collect::<Vec<_>>())
.collect::<Vec<_>>(),
});
println!("{payload}");
ExitCode::SUCCESS
}
/// Build the input tables for `plan` using `backend`. Path-based adapters
/// allocate a fresh tempdir; it drops at the end of this function, which is
/// safe because `build_tables_via_storage` fully materializes the tables
/// into owned `Vec<Value>` before returning.
fn build_tables_for(plan: &Plan, backend: Backend) -> Result<HashMap<String, Table>, String> {
match backend {
Backend::Memory => build_tables(plan).map_err(|e| format!("build error: {e}")),
Backend::MemoryStorage => {
let mut storage = MemoryStorage::default();
build_tables_via_storage(plan, &mut storage)
.map_err(|e| format!("build error (memory-storage): {e}"))
}
Backend::Lmdb => {
let dir = TempDir::new().map_err(|e| format!("tempdir: {e}"))?;
let mut storage = LmdbStorage::open(dir.path())
.map_err(|e| format!("failed to open lmdb backend: {e}"))?;
build_tables_via_storage(plan, &mut storage)
.map_err(|e| format!("build error (lmdb): {e}"))
}
Backend::Redb => {
let dir = TempDir::new().map_err(|e| format!("tempdir: {e}"))?;
let mut storage = RedbStorage::open(dir.path().join("data.redb"))
.map_err(|e| format!("failed to open redb backend: {e}"))?;
build_tables_via_storage(plan, &mut storage)
.map_err(|e| format!("build error (redb): {e}"))
}
Backend::Fjall => {
let dir = TempDir::new().map_err(|e| format!("tempdir: {e}"))?;
let mut storage = FjallStorage::open(dir.path())
.map_err(|e| format!("failed to open fjall backend: {e}"))?;
build_tables_via_storage(plan, &mut storage)
.map_err(|e| format!("build error (fjall): {e}"))
}
Backend::Sqlite => {
let dir = TempDir::new().map_err(|e| format!("tempdir: {e}"))?;
let mut storage = SqliteStorage::open(dir.path().join("data.sqlite"))
.map_err(|e| format!("failed to open sqlite backend: {e}"))?;
build_tables_via_storage(plan, &mut storage)
.map_err(|e| format!("build error (sqlite): {e}"))
}
Backend::Geomerge => {
let relations = plan
.schema
.iter()
.map(|(name, &arity)| (name.clone(), infer_column_kinds(plan, name, arity)));
let mut storage = GeomergeStorage::with_relations(relations)
.map_err(|e| format!("failed to open geomerge backend: {e}"))?;
build_tables_via_storage(plan, &mut storage)
.map_err(|e| format!("build error (geomerge): {e}"))
}
}
}
/// Best-effort column type inference for `geomerge`'s synthesized theory.
/// The runner IR carries only arity, so we peek at the first fact row of
/// the relation. Columns without a sample default to `String`, which
/// matches every checked-in fixture (entity identities are encoded as
/// strings by the exporter).
fn infer_column_kinds(plan: &Plan, name: &str, arity: usize) -> Vec<ColumnKind> {
let mut kinds = vec![ColumnKind::String; arity];
let Some(rows) = plan.facts.get(name) else {
return kinds;
};
let Some(first) = rows.first() else {
return kinds;
};
for (i, cell) in first.iter().take(arity).enumerate() {
kinds[i] = match cell {
JsonValue::Int(_) => ColumnKind::Int,
JsonValue::Str(_) => ColumnKind::String,
};
}
kinds
}
fn read_input(path: &str) -> io::Result<String> {
if path == "-" {
let mut buf = String::new();
io::stdin().read_to_string(&mut buf)?;
Ok(buf)
} else {
std::fs::read_to_string(path)
}
}
fn value_to_json(value: &Value) -> serde_json::Value {
match value {
Value::Int(n) => serde_json::Value::Number((*n).into()),
Value::Str(s) => serde_json::Value::String(s.clone()),
Value::Id(id) => serde_json::Value::String(id.to_string()),
}
}

View File

@ -1,77 +0,0 @@
//! Walks every JSON fixture under `crates/plan-runner/fixtures/` and
//! verifies it against the `expected_bindings` the exporter lifted from
//! the matching `tools/exporter/examples/*.scenario.json`. A fixture without an oracle
//! is reported as a failure (every checked-in fixture is expected to
//! carry one).
use std::collections::BTreeMap;
use std::fs;
use std::path::PathBuf;
use plan_runner::{parse_plan, run_json, verify};
fn fixtures_dir() -> PathBuf {
PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("fixtures")
}
fn collect_fixtures() -> BTreeMap<String, String> {
let mut out = BTreeMap::new();
for entry in fs::read_dir(fixtures_dir()).expect("read fixtures/") {
let path = entry.expect("dir entry").path();
if path.extension().and_then(|e| e.to_str()) != Some("json") {
continue;
}
let name = path
.file_stem()
.and_then(|s| s.to_str())
.expect("ascii fixture name")
.to_string();
let contents = fs::read_to_string(&path).expect("read fixture");
out.insert(name, contents);
}
out
}
#[test]
fn every_fixture_runs_and_matches_its_oracle() {
let fixtures = collect_fixtures();
assert!(
!fixtures.is_empty(),
"no fixtures found in {}",
fixtures_dir().display()
);
let mut failures: Vec<String> = Vec::new();
for (name, json) in &fixtures {
let plan = match parse_plan(json) {
Ok(p) => p,
Err(err) => {
failures.push(format!("{name}: parse error: {err}"));
continue;
}
};
if plan.expected_bindings.is_none() {
failures.push(format!("{name}: fixture has no expected_bindings"));
continue;
}
let relation = match run_json(json) {
Ok(r) => r,
Err(err) => {
failures.push(format!("{name}: run error: {err}"));
continue;
}
};
match verify(&plan, &relation) {
Ok(true) => {}
Ok(false) => failures.push(format!("{name}: verify returned no-oracle unexpectedly")),
Err(err) => failures.push(format!("{name}: {err}")),
}
}
assert!(
failures.is_empty(),
"{} fixture(s) failed:\n {}",
failures.len(),
failures.join("\n ")
);
}

View File

@ -1,52 +0,0 @@
//! Cross-checks the two paths [`plan-runner`] exposes for materializing
//! input tables: the pure [`build_tables`] path and the [`Storage`]-routed
//! [`build_tables_via_storage`] path. Same fixture, same plan, must agree
//! row-for-row.
//!
//! This is the visible proof of the layer boundary: any new `Storage`
//! backend (LMDB, fjall, geomerge) keeps this test honest by re-running it
//! with a different `S`.
use plan_runner::{build_tables, build_tables_via_storage, execute, parse_plan, run_json};
use storage::MemoryStorage;
use storage::value::Value;
const FIXTURE: &str = include_str!("../fixtures/three_atom_chain.json");
#[test]
fn storage_backed_execution_matches_pure_path() {
let plan = parse_plan(FIXTURE).expect("parse plan");
let pure_tables = build_tables(&plan).expect("build_tables");
let pure = execute(&pure_tables, &plan.query).expect("pure execute");
let mut storage = MemoryStorage::default();
let storage_tables =
build_tables_via_storage(&plan, &mut storage).expect("build_tables_via_storage");
let via_storage = execute(&storage_tables, &plan.query).expect("storage execute");
assert_eq!(pure.columns, via_storage.columns);
// Scan order between MemoryStorage and the direct-from-JSON path isn't
// required to match; compare rows as a multiset. `Value` is not `Ord`
// (it carries `RowId` and `String`), so use a Debug-derived sort key.
assert_eq!(sorted_rows(&pure.rows), sorted_rows(&via_storage.rows));
}
#[test]
fn storage_backed_execution_matches_run_json_oracle() {
let plan = parse_plan(FIXTURE).expect("parse plan");
let oracle = run_json(FIXTURE).expect("run_json");
let mut storage = MemoryStorage::default();
let tables = build_tables_via_storage(&plan, &mut storage).expect("build_tables_via_storage");
let via_storage = execute(&tables, &plan.query).expect("storage execute");
assert_eq!(oracle.columns, via_storage.columns);
assert_eq!(sorted_rows(&oracle.rows), sorted_rows(&via_storage.rows));
}
fn sorted_rows(rows: &[Vec<Value>]) -> Vec<String> {
let mut keys: Vec<String> = rows.iter().map(|r| format!("{r:?}")).collect();
keys.sort();
keys
}

View File

@ -121,7 +121,7 @@ How it works (logically):
<div align="center">
<picture>
<img alt="Workflow" src="docs/diagrams/workflow.svg" height="90%" width="90%">
<img alt="Types" src="docs/diagrams/workflow.svg" height="90%" width="90%%">
</picture>
</div>

View File

@ -2,9 +2,9 @@
//!
//! Three operators are in scope:
//!
//! - [`atom::scan_atom`] scans a [`Table`] under an [`atom::AtomPattern`],
//! filtering for repeated-variable equality and literal equality, and
//! outputs a binding [`relation::Relation`].
//! - [`atom::scan_atom`] scans a [`Table`](storage::table::Table) under
//! an [`atom::AtomPattern`], filtering for repeated-variable equality and
//! literal equality, and outputs a binding [`relation::Relation`].
//! - [`join::semijoin`] keeps rows of one relation whose shared-column values
//! appear in another.
//! - [`join::natural_join`] combines rows that agree on shared columns,
@ -14,8 +14,10 @@
//! is just an expression like
//! `natural_join(&semijoin(&a, &b), &scan_atom(&t, &p))`.
//!
//! `Value` and `Table` live in the `storage` crate; consumers that build
//! inputs depend on `storage` directly.
//! Foundational types [`Value`](storage::value::Value) and
//! [`Table`](storage::table::Table) live in `storage`, the
//! storage-layer crate this crate is built on; storage backends produce
//! `Table`s that operators here consume.
pub mod atom;
pub mod join;

View File

@ -24,8 +24,7 @@ This crates helps with decoupling the query execution logic from the underlying
| `adapters::redb::RedbStorage` | struct (feat) | Single-file B-tree backed `Storage`, behind the `redb` feature. Wraps `redb::WriteTransaction` for native atomic commits. |
| `adapters::fjall::FjallStorage` | struct (feat) | LSM-tree backed `Storage`, behind the `fjall` feature. Each relation gets a partition; transactions buffer inserts and apply them on commit. |
| `adapters::lmdb::LmdbStorage` | struct (feat) | mmap'd B-tree backed `Storage`, behind the `lmdb` feature. Wraps `heed`'s `RwTxn` for native atomic commits. |
| `adapters::geomerge::GeomergeStorage` | struct (feat) | CRDT-backed `Storage` over the workspace's `geomerge` crate, behind the `geomerge` feature. Wraps `geomerge::Transaction` and resolves pending row IDs via `CommittedTx`. Deletion is not supported (append-only log). Construct with `from_theory`, `from_store`, or `with_relations` (synthesizes a theory from `(name, Vec<ColumnKind>)` for callers that lack a typed schema). |
| `adapters::geomerge::ColumnKind` | enum (feat) | Primitive column type fed to `GeomergeStorage::with_relations`: `Int` maps to geomerge `PrimInt`, `String` maps to `PrimString`. Exists so callers can synthesize a theory without depending on `geolog-lang::ir` directly. |
| `adapters::geomerge::GeomergeStorage` | struct (feat) | CRDT-backed `Storage` over the workspace's `geomerge` crate, behind the `geomerge` feature. Wraps `geomerge::Transaction` and resolves pending row IDs via `CommittedTx`. Deletion is not supported (append-only log). |
Data types and their relationships:

View File

@ -93,17 +93,6 @@ fn decode_pending_row_id(bytes: &[u8]) -> Result<TempRowId, StorageError> {
}
/// Geomerge-backed [`Storage`] implementation.
/// Primitive column type used by [`GeomergeStorage::with_relations`] to
/// synthesize a theory from an untyped `(name, arity)` schema. Geomerge
/// supports `PrimInt`, `PrimString`, and entity types; only the two
/// primitives are exposed here, since callers using this constructor by
/// definition don't carry entity-target information.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ColumnKind {
Int,
String,
}
pub struct GeomergeStorage {
store: Store,
declared: HashSet<String>,
@ -149,52 +138,6 @@ impl GeomergeStorage {
}
}
/// Build a store with a theory synthesized from a flat list of
/// `(relation_name, column_kinds)`. Each `ColumnKind` is mapped to the
/// matching `PrimType`. No entity columns and no laws are declared.
///
/// This is the convenience constructor for callers (e.g., the
/// `plan-runner` CLI) whose schema only carries arity plus a column-by-
/// column primitive-type guess taken from a sample row. It exists so
/// those callers don't have to depend on `geolog-lang::ir` directly.
///
/// # Errors
/// Returns [`StorageError::Backend`] if geomerge rejects the synthesized
/// theory.
pub fn with_relations<I, S>(relations: I) -> Result<Self, StorageError>
where
I: IntoIterator<Item = (S, Vec<ColumnKind>)>,
S: Into<String>,
{
let tables: Vec<ir::TableEntry> = relations
.into_iter()
.map(|(name, kinds)| {
let columns = kinds
.into_iter()
.map(|k| ir::ColType::PrimType {
prim: match k {
ColumnKind::Int => ir::PrimType::PrimInt,
ColumnKind::String => ir::PrimType::PrimString,
},
})
.collect();
let name: String = name.into();
ir::TableEntry {
path: name.into(),
table: ir::Schema {
columns,
primary_key: None,
},
}
})
.collect();
let theory = ir::FlatTheory {
tables,
laws: Vec::new(),
};
Self::from_theory(theory)
}
/// Borrow the underlying geomerge store (for backend-specific operations
/// like persistence, dump, or law inspection that aren't on the trait).
#[must_use]

View File

@ -154,12 +154,12 @@ impl Transaction for LmdbTx<'_> {
let Some(wtxn) = self.wtxn.as_ref() else {
unreachable!("transaction was already committed")
};
let encoded = self
let raw = self
.meta
.get(wtxn, name.as_bytes())
.map_err(backend)?
.ok_or_else(|| StorageError::RelationNotFound(name.to_string()))?;
let entry = decode_meta(encoded)?;
let entry = decode_meta(raw)?;
self.next_ids.insert(name.to_string(), entry);
entry
};

View File

@ -1,4 +1,4 @@
//! `SQLite` adapter via the `rusqlite` crate (bundled libsqlite3).
//! SQLite adapter via the `rusqlite` crate (bundled libsqlite3).
//!
//! Storage layout:
//!
@ -35,13 +35,13 @@ CREATE TABLE IF NOT EXISTS __rows (
);
";
/// `SQLite`-backed [`Storage`] implementation.
/// SQLite-backed [`Storage`] implementation.
pub struct SqliteStorage {
conn: Connection,
}
impl SqliteStorage {
/// Open or create a `SQLite` database at `path`. Pass `":memory:"` for
/// Open or create a SQLite database at `path`. Pass `":memory:"` for
/// an in-process database (useful in tests).
///
/// # Errors

1
external/geolog vendored

@ -1 +0,0 @@
Subproject commit 426d4c96d6031ccaf5e14c12c3dab496e3b4c365

View File

@ -1,5 +1,5 @@
{
description = "Storage engine playground";
description = "Storage engine playground: Rust workspace for FlowLog, DBSP, CRDT, and Geomerge experiments.";
inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable";
@ -29,13 +29,12 @@
packages = [
rustToolchain
# Diagram regeneration in crates/geomerge-demo/docs/diagrams.
pkgs.graphviz
# Cargo helpers.
pkgs.cargo-watch
pkgs.cargo-nextest
pkgs.haskell.compiler.ghc912
pkgs.cabal-install
pkgs.pkg-config
pkgs.zlib
# Pre-commit hooks (see .pre-commit-config.yaml, Makefile setup-hooks).
pkgs.pre-commit
pkgs.python3
];
@ -45,14 +44,10 @@
};
shellHook = ''
# Banner goes to stderr so `nix develop --command` invocations
# that pipe stdout (e.g. tools/exporter producing JSON) stay clean.
>&2 echo "storage-engine-playground dev shell"
>&2 echo " rustc: $(rustc --version)"
>&2 echo " cargo: $(cargo --version)"
>&2 echo " ghc: $(ghc --version)"
>&2 echo " cabal: $(cabal --version | head -1)"
>&2 echo " dot: $(dot -V 2>&1)"
echo "storage-engine-playground dev shell"
echo " rustc: $(rustc --version)"
echo " cargo: $(cargo --version)"
echo " dot: $(dot -V 2>&1)"
'';
};

View File

@ -1,24 +0,0 @@
-- cabal.project for the geolog -> Rust JSON exporter.
--
-- This file points at the geolog-lang library inside the external/geolog
-- submodule, plus the sibling packages it depends on (data-partition,
-- diagnostician, fnotation). It mirrors the submodule's own cabal.project
-- so the exporter sees the same source set the submodule's tests build
-- against.
packages:
plan-exporter.cabal
../../external/geolog/geolog-lang/geolog-lang.cabal
../../external/geolog/data-partition/data-partition.cabal
../../external/geolog/diagnostician/diagnostician.cabal
../../external/geolog/fnotation/fnotation.cabal
-- geolog-lang's DB.Plan.Render module uses a patched diagrams-graphviz.
-- Same pin as external/geolog/cabal.project.
source-repository-package
type: git
location: https://github.com/georgefst/diagrams-graphviz.git
tag: 993533c564861f9d0663d719eafd56efd95f59ba
jobs: $ncpus
semaphore: true

View File

@ -1,31 +0,0 @@
{
"name": "cartesian",
"_description": "Two disjoint atoms over different tables. Exercises the 'stray' branch of planConjunction's spanning forest (no shared variables = no edge in the intersection graph) and the linear chain of natural-joins that fullJoinForest emits over disconnected components.",
"schema": {
"left": { "columns": [{"entity": "left"}] },
"right": { "columns": [{"entity": "right"}] }
},
"facts": {
"left": [
[{"entity": ["left", 1]}],
[{"entity": ["left", 2]}]
],
"right": [
[{"entity": ["right", 10]}],
[{"entity": ["right", 20]}]
]
},
"atoms": [
{"table": "left", "values": {"0": {"var": "a"}}},
{"table": "right", "values": {"0": {"var": "b"}}}
],
"expected_bindings": {
"columns": ["a", "b"],
"rows": [
[{"entity": ["left", 1]}, {"entity": ["right", 10]}],
[{"entity": ["left", 1]}, {"entity": ["right", 20]}],
[{"entity": ["left", 2]}, {"entity": ["right", 10]}],
[{"entity": ["left", 2]}, {"entity": ["right", 20]}]
]
}
}

View File

@ -1,24 +0,0 @@
{
"name": "self-loop",
"_description": "Single-atom query with a repeated variable across two columns: edge(x, x, _). Exercises evalAtom's equality-enforcement path; the planner emits one PlanEvalAtom node and no joins.",
"schema": {
"edge": { "columns": [{"entity": "node"}, {"entity": "node"}, {"entity": "edge"}] }
},
"facts": {
"edge": [
[{"entity": ["node", 1]}, {"entity": ["node", 2]}, {"entity": ["edge", 1]}],
[{"entity": ["node", 2]}, {"entity": ["node", 2]}, {"entity": ["edge", 2]}],
[{"entity": ["node", 3]}, {"entity": ["node", 3]}, {"entity": ["edge", 3]}]
]
},
"atoms": [
{"table": "edge", "values": {"0": {"var": "x"}, "1": {"var": "x"}}}
],
"expected_bindings": {
"columns": ["x"],
"rows": [
[{"entity": ["node", 2]}],
[{"entity": ["node", 3]}]
]
}
}

View File

@ -1,29 +0,0 @@
{
"name": "three-atom-chain",
"schema": {
"node": { "columns": [{"entity": "node"}] },
"edge": { "columns": [{"entity": "node"}, {"entity": "node"}, {"entity": "edge"}] }
},
"facts": {
"node": [
[{"entity": ["node", 1]}],
[{"entity": ["node", 2]}],
[{"entity": ["node", 3]}]
],
"edge": [
[{"entity": ["node", 1]}, {"entity": ["node", 2]}, {"entity": ["edge", 1]}],
[{"entity": ["node", 2]}, {"entity": ["node", 3]}, {"entity": ["edge", 2]}]
]
},
"atoms": [
{"table": "node", "values": {"0": {"var": "a"}}},
{"table": "edge", "values": {"0": {"var": "a"}, "1": {"var": "b"}}},
{"table": "edge", "values": {"0": {"var": "b"}, "1": {"var": "c"}}}
],
"expected_bindings": {
"columns": ["a", "b", "c"],
"rows": [
[{"entity": ["node", 1]}, {"entity": ["node", 2]}, {"entity": ["node", 3]}]
]
}
}

View File

@ -1,28 +0,0 @@
{
"name": "two-atom-join",
"schema": {
"node": { "columns": [{"entity": "node"}] },
"edge": { "columns": [{"entity": "node"}, {"entity": "node"}, {"entity": "edge"}] }
},
"facts": {
"node": [
[{"entity": ["node", 1]}],
[{"entity": ["node", 2]}]
],
"edge": [
[{"entity": ["node", 1]}, {"entity": ["node", 2]}, {"entity": ["edge", 1]}],
[{"entity": ["node", 2]}, {"entity": ["node", 1]}, {"entity": ["edge", 2]}]
]
},
"atoms": [
{"table": "node", "values": {"0": {"var": "a"}}},
{"table": "edge", "values": {"0": {"var": "a"}, "1": {"var": "b"}}}
],
"expected_bindings": {
"columns": ["a", "b"],
"rows": [
[{"entity": ["node", 1]}, {"entity": ["node", 2]}],
[{"entity": ["node", 2]}, {"entity": ["node", 1]}]
]
}
}

View File

@ -1,38 +0,0 @@
cabal-version: 3.4
name: plan-exporter
version: 0.1.0.0
license: MIT OR Apache-2.0
author: storage-engine-playground
synopsis: Export conjunctive-query plans as JSON for the Rust plan-runner.
description:
Reads a scenario (FlatTheory + facts + a list of QAtoms) from JSON,
runs Geolog.DB.Plan.planConjunction, and emits a plan IR JSON document
that crates/plan-runner consumes. The IR is the contract between the
Haskell frontend and the Rust executor; this tool is currently the only
producer, but any frontend that emits the same JSON shape can drive the
runner.
build-type: Simple
executable plan-export
main-is: Main.hs
hs-source-dirs: src
default-language: GHC2024
default-extensions:
BlockArguments
LambdaCase
OverloadedRecordDot
OverloadedStrings
ghc-options: -Wall
build-depends:
, aeson >=2.2
, aeson-pretty >=0.8
, algebraic-graphs >=0.7
, base
, bytestring
, containers
, fnotation
, geolog-lang
, text

View File

@ -1,354 +0,0 @@
-- | Reads a @.scenario.json@ example, plans its conjunction with
-- @Geolog.DB.Plan.planConjunction@, and writes a runner-IR JSON plan that
-- @crates\/plan-runner@ consumes.
--
-- Invocation:
--
-- @
-- cabal run plan-export -- <scenario.json>
-- @
--
-- The scenario format is documented in @examples\/README@ or by example
-- (@examples\/*.scenario.json@); the output shape is documented in
-- @crates\/plan-runner\/src\/lib.rs@.
--
-- The exporter is also a self-check: before emitting, it runs the planned
-- query through @evalConjunctionPlanned@ and verifies the bindings match
-- the scenario's @expected_bindings@. A mismatched scenario fails loudly
-- here rather than handing a bad fixture to the Rust runner.
module Main (main) where
import Algebra.Graph qualified as AG
import Control.Monad (unless)
import Data.Aeson ((.!=), (.:), (.:?), (.=))
import Data.Aeson qualified as Aeson
import Data.Aeson.Encode.Pretty qualified as AesonPretty
import Data.Aeson.Key qualified as Key
import Data.Aeson.KeyMap qualified as KM
import Data.Aeson.Types (Parser)
import Data.ByteString.Lazy.Char8 qualified as LBS8
import Data.Foldable (toList)
import Data.List (sortOn)
import Data.Map.Strict (Map)
import Data.Map.Strict qualified as Map
import Data.Set qualified as Set
import Data.Text (Text)
import Data.Text qualified as T
import Data.String (fromString)
import FNotation.Names (Name)
import Geolog.DB.InMemory
import Geolog.DB.Plan
import Geolog.IR qualified as IR
import System.Environment (getArgs)
import System.Exit (die)
import System.IO (hPutStrLn, stderr)
-- * Scenario file format
--
-- Mirrors @Geolog.IR.FlatTheory@ + @[(Path, [Val])]@ + @[QAtom]@. The
-- 'Expected' block is optional but, when present, the exporter cross-
-- checks it against the planner's own evaluation before emitting.
data Scenario = Scenario
{ scName :: Text
, scSchema :: Map IR.Path SchemaEntry
, scFacts :: [(IR.Path, [Val])]
, scAtoms :: [QAtom]
, scExpected :: Maybe Expected
}
deriving (Show)
data SchemaEntry = SchemaEntry
{ seColumns :: [IR.ColType]
, sePrimaryKey :: Maybe [Int]
}
deriving (Show)
data Expected = Expected
{ exColumns :: [Text]
, exRows :: [[Val]]
}
deriving (Show)
-- ** JSON parsers
parsePath :: Aeson.Value -> Parser IR.Path
parsePath = Aeson.withText "path" \t -> pure [nameFromText t]
-- | Build a single-segment 'Name' from text. Multi-segment names (which
-- would carry a non-empty 'init' field) aren't needed by any current
-- example; if a scenario wants @"a/b"@-style paths, extend this helper.
nameFromText :: Text -> Name
nameFromText = fromString . T.unpack
instance Aeson.FromJSON SchemaEntry where
parseJSON = Aeson.withObject "SchemaEntry" \o ->
SchemaEntry <$> o .: "columns" <*> o .:? "primaryKey"
instance Aeson.FromJSON IR.ColType where
parseJSON = Aeson.withObject "ColType" \o -> do
case KM.toList o of
[("entity", v)] -> IR.EntityType <$> parsePath v
[("prim", v)] -> IR.PrimType <$> parsePrim v
_ -> fail "ColType: expected {\"entity\": <path>} or {\"prim\": \"int\"|\"string\"}"
parsePrim :: Aeson.Value -> Parser IR.PrimType
parsePrim = Aeson.withText "prim type" \case
"int" -> pure IR.PrimInt
"string" -> pure IR.PrimString
other -> fail ("unknown primitive type: " <> T.unpack other)
parseVal :: Aeson.Value -> Parser Val
parseVal = Aeson.withObject "Val" \o ->
case KM.toList o of
[("int", v)] -> ValInt <$> Aeson.parseJSON v
[("str", v)] -> ValText <$> Aeson.parseJSON v
[("entity", v)] -> parseEntity v
_ -> fail "Val: expected {\"int\": ..} | {\"str\": ..} | {\"entity\": [<path>, <id>]}"
where
parseEntity = Aeson.withArray "entity" \arr -> case toList arr of
[pv, nv] -> do
p <- parsePath pv
n <- Aeson.parseJSON nv
pure (ValEntity p n)
_ -> fail "entity: expected [<path>, <id>]"
parseQVal :: Aeson.Value -> Parser QVal
parseQVal = Aeson.withObject "QVal" \o ->
case KM.toList o of
[("var", v)] -> QVar . Var <$> Aeson.parseJSON v
[("lit", v)] -> QLit <$> parseVal v
_ -> fail "QVal: expected {\"var\": \"name\"} or {\"lit\": <value>}"
parseAtom :: Aeson.Value -> Parser QAtom
parseAtom = Aeson.withObject "QAtom" \o -> do
qaTable <- o .: "table" >>= parsePath
qaRowId <- o .:? "rowId" >>= traverse parseQVal
values <- o .: "values" :: Parser (Map Text Aeson.Value)
qaValues <-
Map.fromList
<$> traverse
( \(k, v) -> case reads (T.unpack k) of
[(i, "")] -> (i,) <$> parseQVal v
_ -> fail ("non-integer key in atom values: " <> T.unpack k)
)
(Map.toList values)
pure QAtom {qaTable, qaRowId, qaValues}
parseExpected :: Aeson.Value -> Parser Expected
parseExpected = Aeson.withObject "Expected" \o -> do
exColumns <- o .: "columns"
rawRows <- o .: "rows" :: Parser [[Aeson.Value]]
exRows <- traverse (traverse parseVal) rawRows
pure Expected {exColumns, exRows}
instance Aeson.FromJSON Scenario where
parseJSON = Aeson.withObject "Scenario" \o -> do
scName <- o .:? "name" .!= "unnamed"
rawSchema <- o .: "schema" :: Parser (Map Text SchemaEntry)
let scSchema = Map.fromList [([nameFromText k], v) | (k, v) <- Map.toList rawSchema]
rawFacts <- o .:? "facts" .!= mempty :: Parser (Map Text [[Aeson.Value]])
scFacts <-
concat
<$> traverse
( \(name, rows) -> do
let path = [nameFromText name]
parsedRows <- traverse (traverse parseVal) rows
pure [(path, row) | row <- parsedRows]
)
(Map.toList rawFacts)
rawAtoms <- o .: "atoms" :: Parser [Aeson.Value]
scAtoms <- traverse parseAtom rawAtoms
scExpected <- o .:? "expected_bindings" >>= traverse parseExpected
pure Scenario {scName, scSchema, scFacts, scAtoms, scExpected}
-- * Scenario → FlatTheory + DB + atoms
toFlatTheory :: Scenario -> IR.FlatTheory
toFlatTheory sc =
IR.FlatTheory
{ tables = Map.map (\e -> IR.Table {columns = seColumns e, primaryKey = sePrimaryKey e}) sc.scSchema
, laws = Map.empty
}
populateDB :: Scenario -> DB
populateDB sc = foldl (\d (p, row) -> insertRow p row d) (fromTheory (toFlatTheory sc)) sc.scFacts
-- * JSON encoding for the plan-runner IR
--
-- The shape is the same one we settled on earlier; see
-- @crates/plan-runner/src/lib.rs@.
pathText :: IR.Path -> Text
pathText = T.intercalate "/" . map (T.pack . show)
pathKey :: IR.Path -> Aeson.Key
pathKey = Key.fromText . pathText
encodeValue :: Val -> Aeson.Value
encodeValue =
Aeson.object . pure . \case
ValInt n -> "int" .= n
ValText t -> "str" .= t
ValEntity p n -> "str" .= (pathText p <> ":" <> T.pack (show n))
encodeTerm :: QVal -> Aeson.Value
encodeTerm = \case
QVar (Var name) -> Aeson.object ["var" .= name]
QLit v -> Aeson.object ["lit" .= encodeValue v]
flattenAtom :: Int -> Int -> QAtom -> [Aeson.Value]
flattenAtom atomIdx arity qa =
[ encodeTerm (Map.findWithDefault (wildcard atomIdx pos) pos merged)
| pos <- [0 .. arity - 1]
]
where
merged = case qa.qaRowId of
Nothing -> qa.qaValues
Just v -> Map.insert (arity - 1) v qa.qaValues
wildcard a p = QVar (Var (T.pack ("_w" <> show a <> "_" <> show p)))
encodeAtom :: Map IR.Path IR.Table -> Int -> QAtom -> Aeson.Value
encodeAtom tables atomIdx qa =
Aeson.object
[ "table" .= pathText qa.qaTable
, "columns" .= flattenAtom atomIdx arity qa
]
where
arity = case Map.lookup qa.qaTable tables of
Just t -> length t.columns
Nothing -> error ("encodeAtom: unknown table " <> show qa.qaTable)
atomIndex :: [QAtom] -> Map QAtom Int
atomIndex atoms = Map.fromList (zip (Set.toList (Set.fromList atoms)) [0 ..])
encodeJoinOp :: JoinType -> Aeson.Value
encodeJoinOp = \case
LeftJoin -> "left"
RightJoin -> "right"
NaturalJoin -> "natural"
encodeNode :: Map IR.Path IR.Table -> Map QAtom Int -> PlanNode -> Aeson.Value
encodeNode tables idx n =
Aeson.object
[ "id" .= n.graphId.unPlanNodeId
, "action" .= case n.action of
PlanEvalAtom qa ->
let i = Map.findWithDefault 0 qa idx
in Aeson.object ["scan" .= encodeAtom tables i qa]
PlanJoin jt (PlanNodeId a) (PlanNodeId b) ->
Aeson.object
[ "join"
.= Aeson.object
[ "op" .= encodeJoinOp jt
, "left" .= a
, "right" .= b
]
]
]
encodeQuery :: Map IR.Path IR.Table -> Map QAtom Int -> PlanGraph -> Aeson.Value
encodeQuery tables idx (PlanGraph g)
| null nodes =
Aeson.object ["root" .= (0 :: Int), "nodes" .= ([] :: [Aeson.Value])]
| otherwise =
Aeson.object
[ "root" .= rootId
, "nodes" .= map (encodeNode tables idx) nodes
]
where
nodes = sortOn (.graphId.unPlanNodeId) (AG.vertexList g)
rootId = case graphRoot (PlanGraph g) of
Just (PlanNodeId i) -> i
Nothing -> (.graphId.unPlanNodeId) (last nodes)
encodeExpected :: Expected -> Aeson.Value
encodeExpected ex =
Aeson.object
[ "columns" .= exColumns ex
, "rows" .= map (map encodeValue) (exRows ex)
]
encodePlan :: Scenario -> Aeson.Value
encodePlan sc =
Aeson.object
( [ "_scenario" .= scName sc
, "schema" .= Aeson.object [pathKey p .= length (seColumns t) | (p, t) <- Map.toList sc.scSchema]
, "facts"
.= Aeson.object
[ pathKey p .= map (map encodeValue) rows
| (p, rows) <- groupedFacts sc.scFacts
]
, "query" .= encodeQuery (toFlatTheory sc).tables (atomIndex sc.scAtoms) (planConjunction sc.scAtoms)
]
++ maybe [] (\e -> ["expected_bindings" .= encodeExpected e]) sc.scExpected
)
groupedFacts :: [(IR.Path, [Val])] -> [(IR.Path, [[Val]])]
groupedFacts = go []
where
go acc [] = reverse [(p, reverse rs) | (p, rs) <- acc]
go acc ((p, row) : rest) =
let acc' = case break (\(q, _) -> q == p) acc of
(before, (q, rs) : after) -> before ++ (q, row : rs) : after
(before, []) -> before ++ [(p, [row])]
in go acc' rest
-- * Self-check
--
-- Cross-check the planned bindings against any user-supplied
-- 'expected_bindings'. Detects two classes of bug before they reach the
-- Rust side: a scenario whose 'expected' is wrong, and a planner output
-- that disagrees with 'evalConjunction'.
selfCheck :: Scenario -> IO ()
selfCheck sc = do
let db = populateDB sc
case evalConjunctionPlanned db sc.scAtoms of
Left err -> die ("self-check failed for " <> T.unpack sc.scName <> ": " <> show err)
Right actual -> case sc.scExpected of
Nothing -> pure ()
Just expected -> verifyAgainstExpected sc.scName expected actual
verifyAgainstExpected :: Text -> Expected -> Bindings -> IO ()
verifyAgainstExpected name expected actual = do
let actualCols = actual.cols
expectedCols = Set.fromList (map Var (exColumns expected))
unless (Set.isSubsetOf expectedCols actualCols) $
die $
"self-check failed for "
<> T.unpack name
<> ": expected_bindings names columns not produced by the plan: "
<> show (Set.difference expectedCols actualCols)
let projectedActual = Set.map (`projectOn` exColumns expected) actual.table
expectedProjected = Set.fromList (map (zip (exColumns expected)) (exRows expected))
expectedSet = Set.map (Map.fromList . map (\(v, x) -> (Var v, x))) expectedProjected
unless (projectedActual == expectedSet) $
die $
"self-check failed for "
<> T.unpack name
<> ":\n expected: "
<> show expectedSet
<> "\n actual: "
<> show projectedActual
projectOn :: Map Var Val -> [Text] -> Map Var Val
projectOn row keys =
Map.fromList [(Var k, v) | k <- keys, Just v <- [Map.lookup (Var k) row]]
-- * Entry point
main :: IO ()
main = do
args <- getArgs
case args of
[path] -> do
raw <- LBS8.readFile path
sc <- case Aeson.eitherDecode raw of
Left err -> die ("failed to parse " <> path <> ": " <> err)
Right sc -> pure sc
selfCheck sc
LBS8.putStrLn (AesonPretty.encodePretty (encodePlan sc))
_ -> do
hPutStrLn stderr "usage: plan-export <scenario.json>"
die ""