This commit is contained in:
Hassan Abedi 2026-06-05 11:31:18 +02:00
parent 510662e7c9
commit 4b866067a4
30 changed files with 1800 additions and 620 deletions

View File

@ -8,7 +8,7 @@ indent_size = 4
insert_final_newline = true insert_final_newline = true
trim_trailing_whitespace = true trim_trailing_whitespace = true
[*.rs] [*.{rs,hs,py}]
max_line_length = 100 max_line_length = 100
[*.md] [*.md]

View File

@ -50,7 +50,7 @@ Expected durable areas may include:
- `src/`: Rust source for parser, catalog, planner, execution experiments, and storage prototypes. - `src/`: Rust source for parser, catalog, planner, execution experiments, and storage prototypes.
- `tests/`: integration tests for rule planning, evaluation, and storage behavior. - `tests/`: integration tests for rule planning, evaluation, and storage behavior.
- `examples/`: small runnable Datalog-like programs or storage scenarios. - `tools/exporter/examples/`: hand-authored scenario JSON consumed by the Haskell exporter to produce runner fixtures.
- `fixtures/`: committed input facts and expected outputs. - `fixtures/`: committed input facts and expected outputs.
- `notes/`: local design notes that belong to this project. - `notes/`: local design notes that belong to this project.
- `flowlog/`: project-local notes or sketches derived from the FlowLog line of work. - `flowlog/`: project-local notes or sketches derived from the FlowLog line of work.

21
Cargo.lock generated
View File

@ -555,16 +555,6 @@ dependencies = [
"wasip3", "wasip3",
] ]
[[package]]
name = "glog-runner"
version = "0.1.0"
dependencies = [
"query-ops",
"serde",
"serde_json",
"storage",
]
[[package]] [[package]]
name = "guardian" name = "guardian"
version = "1.3.0" version = "1.3.0"
@ -1156,6 +1146,17 @@ version = "0.3.33"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19f132c84eca552bf34cab8ec81f1c1dcc229b811638f9d283dceabe58c5569e" checksum = "19f132c84eca552bf34cab8ec81f1c1dcc229b811638f9d283dceabe58c5569e"
[[package]]
name = "plan-runner"
version = "0.1.0"
dependencies = [
"query-ops",
"serde",
"serde_json",
"storage",
"tempfile",
]
[[package]] [[package]]
name = "plotters" name = "plotters"
version = "0.3.7" version = "0.3.7"

View File

@ -77,22 +77,28 @@ clean: ## Remove build output
fi fi
EXPORTER_DIR := tools/exporter EXPORTER_DIR := tools/exporter
EXPORTER_FIXTURES := crates/glog-runner/fixtures EXPORTER_FIXTURES := crates/plan-runner/fixtures
EXPORTER_SCENARIOS := three-atom-chain EXAMPLES_DIR := $(EXPORTER_DIR)/examples
.PHONY: export-fixtures .PHONY: export-fixtures
export-fixtures: ## Regenerate JSON plan fixtures from the Haskell exporter (needs Cabal and GHC; use `make shell` first). export-fixtures: ## Regenerate plan JSON for every tools/exporter/examples/*.scenario.json (needs Cabal and GHC; use `make shell` first).
@if ! command -v cabal >/dev/null 2>&1; then \ @if ! command -v cabal >/dev/null 2>&1; then \
echo "cabal not found. Enter the dev shell with 'make shell' (or 'nix develop') first."; \ echo "cabal not found. Enter the dev shell with 'make shell' (or 'nix develop') first."; \
exit 1; \ exit 1; \
fi fi
@cd $(EXPORTER_DIR) && cabal build glog-export @cd $(EXPORTER_DIR) && cabal build plan-export
@for sc in $(EXPORTER_SCENARIOS); do \ @mkdir -p $(EXPORTER_FIXTURES)
out=$(EXPORTER_FIXTURES)/$$(echo $$sc | tr '-' '_').json; \ @for sc in $(EXAMPLES_DIR)/*.scenario.json; do \
base=$$(basename $$sc .scenario.json); \
out=$(EXPORTER_FIXTURES)/$$base.json; \
echo "exporting $$sc -> $$out"; \ echo "exporting $$sc -> $$out"; \
(cd $(EXPORTER_DIR) && cabal run -v0 glog-export -- $$sc) > $$out; \ (cd $(EXPORTER_DIR) && cabal run -v0 plan-export -- examples/$$base.scenario.json) > $$out; \
done done
.PHONY: examples
examples: export-fixtures ## Regenerate fixtures from scenarios and run them through plan-runner against their oracles.
@cargo test -p plan-runner --test examples
.PHONY: shell .PHONY: shell
shell: ## Enter the Nix dev shell defined in flake.nix shell: ## Enter the Nix dev shell defined in flake.nix
@nix develop @nix develop

View File

@ -1,344 +0,0 @@
//! End-to-end runner that executes a `geolog-lang` conjunctive-query plan
//! against this workspace's storage and `query-ops` operators.
//!
//! The upstream Haskell planner in `external/geolog/geolog-lang`
//! (`Geolog.DB.Plan`) builds a Yannakakis-style join DAG over `QAtom`s. This
//! crate accepts that DAG as JSON, materializes the input relations through
//! the [`Storage`] trait, and walks the DAG using
//! [`query_ops::atom::scan_atom`], [`query_ops::join::semijoin`], and
//! [`query_ops::join::natural_join`]. The result is a binding
//! [`Relation`](query_ops::relation::Relation) over the query's variables.
//!
//! The JSON IR mirrors `Geolog.DB.Plan.JoinPlan` and `Geolog.DB.InMemory.QAtom`
//! without depending on the Haskell side at build time. A Haskell exporter
//! that dumps `(schema, facts, JoinPlan)` to this shape is the planned
//! follow-up that completes the round trip; the IR is the contract.
//!
//! Mapping from the Haskell planner:
//!
//! | `Geolog.DB.Plan` | this crate |
//! |-----------------------------|-----------------------------------------------|
//! | `PlanEvalAtom` | [`Action::Scan`] → `scan_atom` |
//! | `PlanJoin LeftJoin a b` | [`Action::Join`] with [`JoinOp::Left`] → `semijoin(rel[a], rel[b])` |
//! | `PlanJoin RightJoin a b` | [`Action::Join`] with [`JoinOp::Right`] → `semijoin(rel[b], rel[a])` |
//! | `PlanJoin NaturalJoin a b` | [`Action::Join`] with [`JoinOp::Natural`] → `natural_join(rel[a], rel[b])` |
//!
//! The atom side covers `evalAtom` (`Geolog.DB.InMemory`): a [`Term::Var`]
//! repeated across positions enforces equality, [`Term::Lit`] filters by
//! constant, and distinct variables project in first-occurrence order.
use std::collections::HashMap;
use serde::Deserialize;
use query_ops::atom::{AtomPattern, Term, scan_atom};
use query_ops::join::{natural_join, semijoin};
use query_ops::relation::Relation;
use storage::value::Value;
use storage::{MemoryStorage, Storage, StorageError, scan_as_table};
/// A single fixture: schema, ground facts, and a query plan to execute.
#[derive(Debug, Clone, Deserialize)]
pub struct Plan {
/// Relation name → arity (column count).
pub schema: HashMap<String, usize>,
/// Relation name → list of ground tuples to insert before execution.
pub facts: HashMap<String, Vec<Vec<JsonValue>>>,
/// The join DAG itself.
pub query: Query,
}
/// Mirrors `Geolog.DB.Plan.JoinPlan`: a set of nodes plus the id of the
/// rooted result node.
#[derive(Debug, Clone, Deserialize)]
pub struct Query {
pub root: u32,
pub nodes: Vec<Node>,
}
/// One node of the plan DAG. `id`s are dense within a `Query` but don't need
/// to start at any particular value, mirroring the Haskell `PlanNodeId`.
#[derive(Debug, Clone, Deserialize)]
pub struct Node {
pub id: u32,
pub action: Action,
}
/// What to compute at a node. Tagged externally so JSON reads as
/// `{"action": {"scan": {...}}}` or `{"action": {"join": {...}}}`.
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum Action {
Scan(Atom),
Join(Join),
}
/// A flat atom pattern, one entry per column of the target relation.
/// Matches the `toFlatArgs` view used by `Geolog.DB.InMemory.evalAtom`:
/// `qaValues` positions are filled in directly, and the entity-id column
/// (if any) appears at the last position. Wildcard positions in the
/// Haskell `QAtom` (a `Map Int QVal` with a missing key) translate to a
/// fresh, unique variable name on this side, which the operator binds but
/// never joins against.
#[derive(Debug, Clone, Deserialize)]
pub struct Atom {
pub table: String,
pub columns: Vec<JsonTerm>,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum JsonTerm {
Var(String),
Lit(JsonValue),
}
/// Wire-level value tag. Restricted to what `storage::value::Value` carries.
/// Entity identities from the Haskell side (`ValEntity path id`) round-trip
/// through `Str` for now using a `"path:id"` convention; that's a fixture
/// concern, not a runner concern.
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum JsonValue {
Int(i64),
Str(String),
}
#[derive(Debug, Clone, Deserialize)]
pub struct Join {
pub op: JoinOp,
pub left: u32,
pub right: u32,
}
#[derive(Debug, Clone, Copy, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum JoinOp {
/// `Geolog.DB.Plan.LeftJoin`: result is `left` rows whose shared columns
/// appear in `right`. Lowered to `semijoin(left, right)`.
Left,
/// `Geolog.DB.Plan.RightJoin`: result is `right` rows whose shared
/// columns appear in `left`. Lowered to `semijoin(right, left)`.
Right,
/// `Geolog.DB.Plan.NaturalJoin`. Lowered to `natural_join(left, right)`.
Natural,
}
/// Errors a runner can produce in addition to storage failures.
#[derive(Debug)]
pub enum RunError {
/// A fact references a relation that isn't declared in `schema`.
UnknownRelation(String),
/// A node id appears in a `Join` action but no node with that id exists.
MissingNode(u32),
/// `Query.root` doesn't match any node in `nodes`.
MissingRoot(u32),
/// Two nodes share the same id.
DuplicateNode(u32),
/// A join node references its left or right side before that side has
/// been computed: the DAG isn't actually topologically sorted by id, or
/// it has a cycle.
UnresolvedDependency { node: u32, depends_on: u32 },
/// Storage layer rejected an operation.
Storage(StorageError),
}
impl std::fmt::Display for RunError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::UnknownRelation(name) => {
write!(f, "facts reference relation {name:?} not in schema")
}
Self::MissingNode(id) => write!(f, "plan references missing node id {id}"),
Self::MissingRoot(id) => write!(f, "plan root id {id} matches no node"),
Self::DuplicateNode(id) => write!(f, "duplicate node id {id} in plan"),
Self::UnresolvedDependency { node, depends_on } => write!(
f,
"node {node} depends on {depends_on}, which has not been computed yet"
),
Self::Storage(err) => write!(f, "storage error: {err}"),
}
}
}
impl std::error::Error for RunError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::Storage(err) => Some(err),
_ => None,
}
}
}
impl From<StorageError> for RunError {
fn from(err: StorageError) -> Self {
Self::Storage(err)
}
}
impl From<JsonValue> for Value {
fn from(jv: JsonValue) -> Self {
match jv {
JsonValue::Int(n) => Self::Int(n),
JsonValue::Str(s) => Self::Str(s),
}
}
}
impl From<JsonTerm> for Term {
fn from(t: JsonTerm) -> Self {
match t {
JsonTerm::Var(name) => Self::Var(name),
JsonTerm::Lit(value) => Self::Lit(value.into()),
}
}
}
/// Parse a [`Plan`] from a JSON string.
///
/// # Errors
/// Returns a [`serde_json::Error`] if the input isn't valid JSON in the
/// expected shape.
pub fn parse_plan(json: &str) -> Result<Plan, serde_json::Error> {
serde_json::from_str(json)
}
/// Load schema and facts from a [`Plan`] into a fresh [`MemoryStorage`].
///
/// All facts are inserted in a single transaction; commit is atomic so a
/// failure on row N leaves the storage empty.
///
/// # Errors
/// Returns [`RunError::UnknownRelation`] if facts mention a relation not
/// declared in `schema`. Wraps storage failures (arity mismatch, transaction
/// errors) in [`RunError::Storage`].
pub fn load_into_memory(plan: &Plan) -> Result<MemoryStorage, RunError> {
let mut storage = MemoryStorage::default();
for (name, arity) in &plan.schema {
storage.create_relation(name, *arity)?;
}
{
let mut tx = storage.transaction()?;
for (name, rows) in &plan.facts {
if !plan.schema.contains_key(name) {
return Err(RunError::UnknownRelation(name.clone()));
}
for row in rows {
let cells: Vec<Value> = row.iter().cloned().map(Value::from).collect();
tx.insert(name, cells)?;
}
}
let _ = tx.commit()?;
}
Ok(storage)
}
/// Execute a plan against a storage backend, returning the bindings
/// [`Relation`] for the rooted plan node.
///
/// Nodes are executed in ascending `id` order. For a Yannakakis plan as
/// emitted by `Geolog.DB.Plan` this is equivalent to a topological sort,
/// since `insertJoin` only references node ids that have already been
/// allocated. A non-monotone id ordering is rejected with
/// [`RunError::UnresolvedDependency`].
///
/// # Errors
/// Returns [`RunError::DuplicateNode`] for repeated ids,
/// [`RunError::MissingNode`] for join references to unknown ids,
/// [`RunError::MissingRoot`] if `query.root` isn't present, and storage
/// errors during the per-scan `scan_as_table` call.
pub fn execute<S: Storage>(storage: &S, query: &Query) -> Result<Relation, RunError> {
let mut seen_ids: std::collections::HashSet<u32> =
std::collections::HashSet::with_capacity(query.nodes.len());
for node in &query.nodes {
if !seen_ids.insert(node.id) {
return Err(RunError::DuplicateNode(node.id));
}
}
if !seen_ids.contains(&query.root) {
return Err(RunError::MissingRoot(query.root));
}
let mut ordered: Vec<&Node> = query.nodes.iter().collect();
ordered.sort_by_key(|n| n.id);
let mut results: HashMap<u32, Relation> = HashMap::with_capacity(ordered.len());
for node in ordered {
let computed = match &node.action {
Action::Scan(atom) => {
let table = scan_as_table(storage, &atom.table)?;
let pattern = AtomPattern {
columns: atom.columns.iter().cloned().map(Term::from).collect(),
};
scan_atom(&table, &pattern)
}
Action::Join(join) => {
let left = require_dep(&results, &seen_ids, node.id, join.left)?;
let right = require_dep(&results, &seen_ids, node.id, join.right)?;
match join.op {
JoinOp::Left => semijoin(left, right),
JoinOp::Right => semijoin(right, left),
JoinOp::Natural => natural_join(left, right),
}
}
};
results.insert(node.id, computed);
}
results
.remove(&query.root)
.ok_or(RunError::MissingRoot(query.root))
}
fn require_dep<'a>(
results: &'a HashMap<u32, Relation>,
seen: &std::collections::HashSet<u32>,
node: u32,
depends_on: u32,
) -> Result<&'a Relation, RunError> {
if let Some(rel) = results.get(&depends_on) {
Ok(rel)
} else if seen.contains(&depends_on) {
Err(RunError::UnresolvedDependency { node, depends_on })
} else {
Err(RunError::MissingNode(depends_on))
}
}
/// Convenience: parse JSON, load it into a fresh in-memory storage, and
/// execute, returning the root binding relation.
///
/// # Errors
/// Returns a JSON parse error if the input is malformed, or a [`RunError`]
/// for any later step.
pub fn run_json(json: &str) -> Result<Relation, RunFromJsonError> {
let plan = parse_plan(json).map_err(RunFromJsonError::Parse)?;
let storage = load_into_memory(&plan).map_err(RunFromJsonError::Run)?;
let bindings = execute(&storage, &plan.query).map_err(RunFromJsonError::Run)?;
Ok(bindings)
}
/// Combined error from [`run_json`].
#[derive(Debug)]
pub enum RunFromJsonError {
Parse(serde_json::Error),
Run(RunError),
}
impl std::fmt::Display for RunFromJsonError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Parse(err) => write!(f, "parse error: {err}"),
Self::Run(err) => write!(f, "run error: {err}"),
}
}
}
impl std::error::Error for RunFromJsonError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::Parse(err) => Some(err),
Self::Run(err) => Some(err),
}
}
}

View File

@ -1,59 +0,0 @@
//! `glog-run` CLI: read a JSON plan from a file (or stdin if `-`), execute
//! it against a fresh in-memory store, and print the resulting binding
//! relation as JSON on stdout.
use std::io::{self, Read};
use std::process::ExitCode;
fn main() -> ExitCode {
let mut args = std::env::args().skip(1);
let Some(path) = args.next() else {
eprintln!("usage: glog-run <plan.json | ->");
return ExitCode::from(2);
};
let input = match read_input(&path) {
Ok(s) => s,
Err(err) => {
eprintln!("failed to read {path}: {err}");
return ExitCode::from(1);
}
};
let relation = match glog_runner::run_json(&input) {
Ok(r) => r,
Err(err) => {
eprintln!("{err}");
return ExitCode::from(1);
}
};
let payload = serde_json::json!({
"columns": relation.columns,
"rows": relation
.rows
.iter()
.map(|row| row.iter().map(value_to_json).collect::<Vec<_>>())
.collect::<Vec<_>>(),
});
println!("{payload}");
ExitCode::SUCCESS
}
fn read_input(path: &str) -> io::Result<String> {
if path == "-" {
let mut buf = String::new();
io::stdin().read_to_string(&mut buf)?;
Ok(buf)
} else {
std::fs::read_to_string(path)
}
}
fn value_to_json(value: &storage::value::Value) -> serde_json::Value {
match value {
storage::value::Value::Int(n) => serde_json::Value::Number((*n).into()),
storage::value::Value::Str(s) => serde_json::Value::String(s.clone()),
storage::value::Value::Id(id) => serde_json::Value::String(id.to_string()),
}
}

View File

@ -1,73 +0,0 @@
//! End-to-end check: run the JSON fixture and verify the resulting bindings
//! match the `DB.InMemoryTest` "matches evalConjunction on three-atom chain"
//! case from `external/geolog/geolog-lang/test/DB/InMemoryTest.hs`.
//!
//! For `node = {e1, e2, e3}` and `edge = {(e1,e2,ee1), (e2,e3,ee2)}` the
//! conjunction `node(a), edge(a, b, _), edge(b, c, _)` has exactly one
//! solution: `(a=e1, b=e2, c=e3)`.
use std::collections::BTreeMap;
use glog_runner::run_json;
use storage::value::Value;
fn fixture() -> &'static str {
include_str!("../fixtures/three_atom_chain.json")
}
fn ent(path: &str, id: u32) -> Value {
Value::Str(format!("{path}:{id}"))
}
fn project<'a>(
columns: &'a [String],
row: &'a [Value],
keep: &'a [&'a str],
) -> BTreeMap<&'a str, &'a Value> {
keep.iter()
.map(|name| {
let pos = columns
.iter()
.position(|c| c == name)
.expect("column missing");
(*name, &row[pos])
})
.collect()
}
#[test]
fn three_atom_chain_matches_haskell_oracle() {
let result = run_json(fixture()).expect("fixture should execute");
// The plan's root keeps every variable, including the per-atom wildcards
// `_r1` and `_r2`. The oracle only asserts the (a, b, c) projection.
let keep = ["a", "b", "c"];
let mut projected: Vec<BTreeMap<&str, &Value>> = result
.rows
.iter()
.map(|row| project(&result.columns, row, &keep))
.collect();
projected.sort_by_key(|m| format!("{m:?}"));
let e1 = ent("node", 1);
let e2 = ent("node", 2);
let e3 = ent("node", 3);
let expected = vec![BTreeMap::from([("a", &e1), ("b", &e2), ("c", &e3)])];
assert_eq!(projected, expected);
}
#[test]
fn root_columns_cover_a_b_c_plus_two_wildcards() {
// The exporter emits unique wildcard variable names for the entity-id
// column of each edge atom (e.g. `_w0_2`, `_w1_2`); their exact spelling
// is an implementation detail of the exporter, so this test only checks
// that the named variables are all present and that the total column
// count is the three named ones plus two anonymous wildcards.
let result = run_json(fixture()).expect("fixture should execute");
let cols: std::collections::HashSet<&str> = result.columns.iter().map(String::as_str).collect();
for expected in ["a", "b", "c"] {
assert!(cols.contains(expected), "missing column {expected}");
}
assert_eq!(result.columns.len(), 5, "expected 3 named + 2 wildcards");
}

View File

@ -1,5 +1,5 @@
[package] [package]
name = "glog-runner" name = "plan-runner"
version = "0.1.0" version = "0.1.0"
edition.workspace = true edition.workspace = true
license.workspace = true license.workspace = true
@ -9,11 +9,18 @@ rust-version.workspace = true
workspace = true workspace = true
[dependencies] [dependencies]
storage = { path = "../storage" }
query-ops = { path = "../query-ops" } query-ops = { path = "../query-ops" }
storage = { path = "../storage", features = [
"lmdb",
"redb",
"fjall",
"sqlite",
"geomerge",
] }
serde = { version = "1", features = ["derive"] } serde = { version = "1", features = ["derive"] }
serde_json = "1" serde_json = "1"
tempfile = "3"
[[bin]] [[bin]]
name = "glog-run" name = "plan-run"
path = "src/main.rs" path = "src/main.rs"

View File

@ -0,0 +1,122 @@
## Plan Runner
This crate is a snapshot executor for conjunctive-query plans.
It reads a JSON plan (a DAG of scan and join nodes plus the input facts),
walks the DAG using the operators from [`query-ops`](../query-ops),
and prints the binding relation produced at the root node.
The wire format mirrors `Geolog.DB.Plan.PlanGraph` from the
[`geolog`](../../external/geolog) submodule, but the JSON shape is the contract:
any frontend that emits this format can drive the runner.
The mapping from `PlanEvalAtom` / `PlanJoin` to `scan_atom` / `semijoin` / `natural_join`,
and the full IR spec, are documented as module-level rustdoc in
[`src/lib.rs`](src/lib.rs).
### Pipeline
End-to-end, scenarios become runner output through three stages:
```text
tools/exporter/examples/*.scenario.json
└── (Haskell exporter; runs Geolog.DB.Plan.planConjunction
and Geolog.DB.InMemory.evalConjunctionPlanned as a self-check)
└── crates/plan-runner/fixtures/*.json (JSON IR; checked in)
└── (plan-runner; this crate)
└── stdout JSON, with row-for-row oracle check
```
The exporter (`tools/exporter`) is the only producer of runner IR today;
it's where atoms are planned and rejected if they don't fit the supported subset.
Fixtures are regenerated with `make export-fixtures`, and the full loop is `make examples`.
### Backends
The CLI takes a `--backend` flag.
The `memory` backend is the pure in-memory path;
every other backend routes facts through the [`Storage`](../storage) trait
via `build_tables_via_storage`, then scans tables back out before executing.
| Backend | Storage | Location |
|------------------|--------------------------------------------------|--------------------------------|
| `memory` | none (direct from `plan.facts`) | n/a |
| `memory-storage` | `MemoryStorage` | in-process |
| `lmdb` | `LmdbStorage` (heed-backed mmap B-tree) | fresh tempdir per run |
| `redb` | `RedbStorage` (single-file B-tree) | fresh tempdir per run |
| `fjall` | `FjallStorage` (LSM tree) | fresh tempdir per run |
| `sqlite` | `SqliteStorage` (rusqlite, bundled libsqlite3) | fresh tempdir per run |
| `geomerge` | `GeomergeStorage` (CRDT; alpha) | in-process |
All seven produce byte-identical output for every checked-in fixture.
The point of the abstraction is not performance comparison
(the snapshot evaluator is bulk-materialized either way),
but to validate that the storage layer is genuinely backend-neutral
and that adding a new adapter is a constructor swap.
Note on `geomerge`:
the runner's JSON IR is untyped (only arity per relation),
but geomerge requires a typed theory upfront.
The CLI infers column types from the first fact row per relation
and synthesizes a theory of `PrimInt` and `PrimString` columns via
[`GeomergeStorage::with_relations`](../storage/src/adapters/geomerge.rs).
Columns with no sample facts default to `PrimString`.
### Run It
```sh
# Run one fixture through the default in-memory path:
cargo run -p plan-runner -- crates/plan-runner/fixtures/two_atom_join.json
# Same plan, routed through different backends:
cargo run -p plan-runner -- --backend memory-storage crates/plan-runner/fixtures/two_atom_join.json
cargo run -p plan-runner -- --backend lmdb crates/plan-runner/fixtures/two_atom_join.json
cargo run -p plan-runner -- --backend redb crates/plan-runner/fixtures/two_atom_join.json
cargo run -p plan-runner -- --backend fjall crates/plan-runner/fixtures/two_atom_join.json
cargo run -p plan-runner -- --backend sqlite crates/plan-runner/fixtures/two_atom_join.json
cargo run -p plan-runner -- --backend geomerge crates/plan-runner/fixtures/two_atom_join.json
# Regenerate every fixture from its scenario and run the oracle test:
make examples
```
A sample run:
```sh
$ plan-run crates/plan-runner/fixtures/two_atom_join.json
{"columns":["a","b","_w0_2"],"rows":[["node:1","node:2","edge:1"],["node:2","node:1","edge:2"]]}
```
The `_w<atomIdx>_<pos>` columns are wildcards the exporter named so the runner can bind them.
The scenario's `expected_bindings` block names only the variables the test cares about,
and `verify` projects the runner output to that subset before comparing as a multiset.
### Run the Tests
```sh
cargo test -p plan-runner
```
The two integration test files exercise complementary properties:
- `tests/examples.rs` walks every fixture and checks it against its `expected_bindings` oracle.
- `tests/storage_roundtrip.rs` cross-checks the pure path against the storage-backed path,
to keep `build_tables` and `build_tables_via_storage` in lockstep.
### Notes
- **IR contract.**
The runner is backend-agnostic and frontend-agnostic:
it consumes JSON in the shape documented in `src/lib.rs` and produces a binding relation.
Anything that emits the same JSON can drive it.
- **No optimizer.**
Plans are executed as written.
Node ordering, join shape, and antijoin scheduling are all the producer's responsibility.
This crate's job ends at faithful execution of the IR.
- **Wildcard columns survive.**
`scan_atom` keeps every distinct variable that appears in the pattern,
including the exporter's synthetic `_w<atomIdx>_<pos>` names.
The runner does not project them out;
oracle verification handles that on the comparison side.
- **Bulk, not streaming.**
Each node materializes its full output as a `Relation`.
This matches `query-ops`' execution model;
it's not designed for incremental or maintained-view workloads.

View File

@ -0,0 +1,114 @@
{
"_scenario": "cartesian",
"expected_bindings": {
"columns": [
"a",
"b"
],
"rows": [
[
{
"str": "left:1"
},
{
"str": "right:10"
}
],
[
{
"str": "left:1"
},
{
"str": "right:20"
}
],
[
{
"str": "left:2"
},
{
"str": "right:10"
}
],
[
{
"str": "left:2"
},
{
"str": "right:20"
}
]
]
},
"facts": {
"left": [
[
{
"str": "left:1"
}
],
[
{
"str": "left:2"
}
]
],
"right": [
[
{
"str": "right:10"
}
],
[
{
"str": "right:20"
}
]
]
},
"query": {
"nodes": [
{
"action": {
"scan": {
"columns": [
{
"var": "a"
}
],
"table": "left"
}
},
"id": 1
},
{
"action": {
"scan": {
"columns": [
{
"var": "b"
}
],
"table": "right"
}
},
"id": 2
},
{
"action": {
"join": {
"left": 1,
"op": "natural",
"right": 2
}
},
"id": 3
}
],
"root": 3
},
"schema": {
"left": 1,
"right": 1
}
}

View File

@ -0,0 +1,84 @@
{
"_scenario": "self-loop",
"expected_bindings": {
"columns": [
"x"
],
"rows": [
[
{
"str": "node:2"
}
],
[
{
"str": "node:3"
}
]
]
},
"facts": {
"edge": [
[
{
"str": "node:1"
},
{
"str": "node:2"
},
{
"str": "edge:1"
}
],
[
{
"str": "node:2"
},
{
"str": "node:2"
},
{
"str": "edge:2"
}
],
[
{
"str": "node:3"
},
{
"str": "node:3"
},
{
"str": "edge:3"
}
]
]
},
"query": {
"nodes": [
{
"action": {
"scan": {
"columns": [
{
"var": "x"
},
{
"var": "x"
},
{
"var": "_w0_2"
}
],
"table": "edge"
}
},
"id": 1
}
],
"root": 1
},
"schema": {
"edge": 3
}
}

View File

@ -1,5 +1,25 @@
{ {
"_scenario": "three-atom-chain", "_scenario": "three-atom-chain",
"expected_bindings": {
"columns": [
"a",
"b",
"c"
],
"rows": [
[
{
"str": "node:1"
},
{
"str": "node:2"
},
{
"str": "node:3"
}
]
]
},
"facts": { "facts": {
"edge": [ "edge": [
[ [

View File

@ -0,0 +1,136 @@
{
"_scenario": "two-atom-join",
"expected_bindings": {
"columns": [
"a",
"b"
],
"rows": [
[
{
"str": "node:1"
},
{
"str": "node:2"
}
],
[
{
"str": "node:2"
},
{
"str": "node:1"
}
]
]
},
"facts": {
"edge": [
[
{
"str": "node:1"
},
{
"str": "node:2"
},
{
"str": "edge:1"
}
],
[
{
"str": "node:2"
},
{
"str": "node:1"
},
{
"str": "edge:2"
}
]
],
"node": [
[
{
"str": "node:1"
}
],
[
{
"str": "node:2"
}
]
]
},
"query": {
"nodes": [
{
"action": {
"scan": {
"columns": [
{
"var": "a"
},
{
"var": "b"
},
{
"var": "_w0_2"
}
],
"table": "edge"
}
},
"id": 1
},
{
"action": {
"scan": {
"columns": [
{
"var": "a"
}
],
"table": "node"
}
},
"id": 2
},
{
"action": {
"join": {
"left": 1,
"op": "left",
"right": 2
}
},
"id": 3
},
{
"action": {
"join": {
"left": 3,
"op": "right",
"right": 2
}
},
"id": 4
},
{
"action": {
"join": {
"left": 3,
"op": "natural",
"right": 4
}
},
"id": 5
}
],
"root": 5
},
"schema": {
"edge": 3,
"node": 1
}
}

View File

@ -0,0 +1,540 @@
//! Snapshot executor for conjunctive-query plans.
//!
//! Takes a structural plan (a DAG of `Scan` and `Join` nodes), the input
//! tables it scans, and walks the DAG via [`query_ops::atom::scan_atom`],
//! [`query_ops::join::semijoin`], and [`query_ops::join::natural_join`].
//! The result is a binding [`Relation`](query_ops::relation::Relation) over
//! the query's variables.
//!
//! The runner is intentionally backend-agnostic: it depends only on
//! `query-ops`, and the planner that emits the JSON IR is decoupled from
//! the storage backend that produced the facts. To execute a plan against
//! a [`Storage`](storage::Storage) backend, materialize each input table
//! with [`storage::scan_as_table`] and call [`execute`] with the resulting
//! map. The in-tree `tests/storage_roundtrip.rs` is the canonical example.
//!
//! The JSON IR mirrors `Geolog.DB.Plan.PlanGraph` and
//! `Geolog.DB.InMemory.QAtom` from the `external/geolog` submodule, but the
//! shape is the contract: any frontend that emits this JSON can use the
//! runner.
//!
//! Operator mapping from the Haskell planner:
//!
//! | `Geolog.DB.Plan` | this crate |
//! |-----------------------------|-----------------------------------------------|
//! | `PlanEvalAtom` | [`Action::Scan`] → `scan_atom` |
//! | `PlanJoin LeftJoin a b` | [`Action::Join`] with [`JoinOp::Left`] → `semijoin(rel[a], rel[b])` |
//! | `PlanJoin RightJoin a b` | [`Action::Join`] with [`JoinOp::Right`] → `semijoin(rel[b], rel[a])` |
//! | `PlanJoin NaturalJoin a b` | [`Action::Join`] with [`JoinOp::Natural`] → `natural_join(rel[a], rel[b])` |
//!
//! The atom side covers `evalAtom` (`Geolog.DB.InMemory`): a [`Term::Var`]
//! repeated across positions enforces equality, [`Term::Lit`] filters by
//! constant, and distinct variables project in first-occurrence order.
use std::collections::HashMap;
use serde::Deserialize;
use query_ops::atom::{AtomPattern, Term, scan_atom};
use query_ops::join::{natural_join, semijoin};
use query_ops::relation::Relation;
use storage::table::Table;
use storage::value::Value;
use storage::{Storage, StorageError, scan_as_table};
/// A single fixture: schema, ground facts, and a query plan to execute.
#[derive(Debug, Clone, Deserialize)]
pub struct Plan {
/// Relation name → arity (column count).
pub schema: HashMap<String, usize>,
/// Relation name → list of ground tuples to insert before execution.
pub facts: HashMap<String, Vec<Vec<JsonValue>>>,
/// The join DAG itself.
pub query: Query,
/// Optional oracle: if present, [`verify`] cross-checks an executed
/// [`Relation`] against this projection. The exporter lifts the
/// scenario's `expected_bindings` block into this field.
#[serde(default)]
pub expected_bindings: Option<ExpectedBindings>,
}
/// Expected query result, projected to a named subset of variables. The
/// columns named here must all appear in the runner's output; any extra
/// columns (typically per-atom wildcards) are ignored.
#[derive(Debug, Clone, Deserialize)]
pub struct ExpectedBindings {
pub columns: Vec<String>,
pub rows: Vec<Vec<JsonValue>>,
}
/// Mirrors `Geolog.DB.Plan.PlanGraph`: a set of nodes plus the id of the
/// rooted result node (the last node in topological order).
#[derive(Debug, Clone, Deserialize)]
pub struct Query {
pub root: u32,
pub nodes: Vec<Node>,
}
/// One node of the plan DAG. `id`s don't need to start at any particular
/// value, mirroring the Haskell `PlanNodeId`.
#[derive(Debug, Clone, Deserialize)]
pub struct Node {
pub id: u32,
pub action: Action,
}
/// What to compute at a node. Tagged externally so JSON reads as
/// `{"action": {"scan": {...}}}` or `{"action": {"join": {...}}}`.
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum Action {
Scan(Atom),
Join(Join),
}
/// A flat atom pattern, one entry per column of the target relation.
/// Matches the `toFlatArgs` view used by `Geolog.DB.InMemory.evalAtom`:
/// `qaValues` positions are filled in directly, and the entity-id column
/// (if any) appears at the last position. Wildcard positions in the
/// Haskell `QAtom` (a `Map Int QVal` with a missing key) translate to a
/// fresh, unique variable name on this side, which the operator binds but
/// never joins against.
#[derive(Debug, Clone, Deserialize)]
pub struct Atom {
pub table: String,
pub columns: Vec<JsonTerm>,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum JsonTerm {
Var(String),
Lit(JsonValue),
}
/// Wire-level value tag. Restricted to what
/// [`storage::value::Value`](storage::value::Value) carries. Entity identities from
/// the Haskell side (`ValEntity path id`) round-trip through `Str` using a
/// `"path:id"` convention; that's a fixture concern, not a runner concern.
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum JsonValue {
Int(i64),
Str(String),
}
#[derive(Debug, Clone, Deserialize)]
pub struct Join {
pub op: JoinOp,
pub left: u32,
pub right: u32,
}
#[derive(Debug, Clone, Copy, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum JoinOp {
/// `Geolog.DB.Plan.LeftJoin`: result is `left` rows whose shared columns
/// appear in `right`. Lowered to `semijoin(left, right)`.
Left,
/// `Geolog.DB.Plan.RightJoin`: result is `right` rows whose shared
/// columns appear in `left`. Lowered to `semijoin(right, left)`.
Right,
/// `Geolog.DB.Plan.NaturalJoin`. Lowered to `natural_join(left, right)`.
Natural,
}
/// Errors produced by [`verify`] when actual bindings don't match the
/// scenario's `expected_bindings` projection.
#[derive(Debug)]
pub enum VerifyError {
/// An expected column wasn't produced by the plan.
MissingColumn(String),
/// An expected row's width didn't match the column count.
ExpectedRowArity { expected: usize, got: usize },
/// The expected and actual rows (after projection) differ as multisets.
BindingsMismatch {
expected: Vec<Vec<Value>>,
actual: Vec<Vec<Value>>,
},
}
impl std::fmt::Display for VerifyError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::MissingColumn(name) => {
write!(f, "expected column {name:?} not in plan output")
}
Self::ExpectedRowArity { expected, got } => write!(
f,
"expected row has {got} cells but columns has {expected} entries"
),
Self::BindingsMismatch { expected, actual } => write!(
f,
"bindings mismatch:\n expected: {expected:?}\n actual: {actual:?}"
),
}
}
}
impl std::error::Error for VerifyError {}
/// Cross-check an executed [`Relation`] against a [`Plan`]'s
/// `expected_bindings`. Projects `actual` to the expected columns (so the
/// runner is free to surface wildcard columns the oracle doesn't name) and
/// compares as a multiset.
///
/// Returns `Ok(true)` if the plan carried an oracle and it matched,
/// `Ok(false)` if there was no oracle (caller decides whether that's an
/// error). Returns [`VerifyError`] on mismatch.
///
/// # Errors
/// See [`VerifyError`].
pub fn verify(plan: &Plan, actual: &Relation) -> Result<bool, VerifyError> {
let Some(expected) = &plan.expected_bindings else {
return Ok(false);
};
let mut projection: Vec<usize> = Vec::with_capacity(expected.columns.len());
for col in &expected.columns {
let idx = actual
.columns
.iter()
.position(|c| c == col)
.ok_or_else(|| VerifyError::MissingColumn(col.clone()))?;
projection.push(idx);
}
let mut actual_proj: Vec<Vec<Value>> = actual
.rows
.iter()
.map(|row| projection.iter().map(|&i| row[i].clone()).collect())
.collect();
let mut expected_proj: Vec<Vec<Value>> = Vec::with_capacity(expected.rows.len());
for row in &expected.rows {
if row.len() != expected.columns.len() {
return Err(VerifyError::ExpectedRowArity {
expected: expected.columns.len(),
got: row.len(),
});
}
expected_proj.push(row.iter().cloned().map(Value::from).collect());
}
// Value is not Ord; use Debug-derived sort keys to compare as a multiset.
let key = |row: &[Value]| -> String { format!("{row:?}") };
actual_proj.sort_by_key(|r| key(r));
expected_proj.sort_by_key(|r| key(r));
if actual_proj == expected_proj {
Ok(true)
} else {
Err(VerifyError::BindingsMismatch {
expected: expected_proj,
actual: actual_proj,
})
}
}
/// Errors a runner can produce during plan validation and execution.
#[derive(Debug)]
pub enum RunError {
/// A fact or scan references a relation that isn't declared in `schema`.
UnknownRelation(String),
/// A scan refers to a table that wasn't supplied in the input map.
MissingTable(String),
/// A fact row's length doesn't match the schema's declared arity.
ArityMismatch {
relation: String,
expected: usize,
got: usize,
},
/// A scan's atom pattern doesn't match the table's arity.
PatternArityMismatch {
table: String,
table_arity: usize,
pattern_arity: usize,
},
/// A join node references a node id that doesn't exist.
MissingNode(u32),
/// `Query.root` doesn't match any node in `nodes`.
MissingRoot(u32),
/// Two nodes share the same id.
DuplicateNode(u32),
/// A join node references its left or right side before that side has
/// been computed: the DAG isn't actually topologically sorted by id, or
/// it has a cycle.
UnresolvedDependency { node: u32, depends_on: u32 },
/// A [`Storage`] backend used to materialize tables returned an error.
Storage(StorageError),
}
impl From<StorageError> for RunError {
fn from(err: StorageError) -> Self {
Self::Storage(err)
}
}
impl std::fmt::Display for RunError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::UnknownRelation(name) => {
write!(f, "facts reference relation {name:?} not in schema")
}
Self::MissingTable(name) => write!(f, "scan references missing table {name:?}"),
Self::ArityMismatch {
relation,
expected,
got,
} => write!(
f,
"relation {relation:?}: row arity {got} differs from schema arity {expected}"
),
Self::PatternArityMismatch {
table,
table_arity,
pattern_arity,
} => write!(
f,
"scan of {table:?}: pattern has {pattern_arity} columns, table has {table_arity}"
),
Self::MissingNode(id) => write!(f, "plan references missing node id {id}"),
Self::MissingRoot(id) => write!(f, "plan root id {id} matches no node"),
Self::DuplicateNode(id) => write!(f, "duplicate node id {id} in plan"),
Self::UnresolvedDependency { node, depends_on } => write!(
f,
"node {node} depends on {depends_on}, which has not been computed yet"
),
Self::Storage(err) => write!(f, "storage backend error: {err}"),
}
}
}
impl std::error::Error for RunError {}
impl From<JsonValue> for Value {
fn from(jv: JsonValue) -> Self {
match jv {
JsonValue::Int(n) => Self::Int(n),
JsonValue::Str(s) => Self::Str(s),
}
}
}
impl From<JsonTerm> for Term {
fn from(t: JsonTerm) -> Self {
match t {
JsonTerm::Var(name) => Self::Var(name),
JsonTerm::Lit(value) => Self::Lit(value.into()),
}
}
}
/// Parse a [`Plan`] from a JSON string.
///
/// # Errors
/// Returns a [`serde_json::Error`] if the input isn't valid JSON in the
/// expected shape.
pub fn parse_plan(json: &str) -> Result<Plan, serde_json::Error> {
serde_json::from_str(json)
}
/// Build the input [`Table`] for each relation declared in a [`Plan`]'s
/// schema, populating rows from the plan's `facts` map. Relations with no
/// facts get an empty table at the declared arity.
///
/// # Errors
/// Returns [`RunError::UnknownRelation`] if `facts` mentions a relation
/// not in `schema`, or [`RunError::ArityMismatch`] if a row's width doesn't
/// match the declared arity.
pub fn build_tables(plan: &Plan) -> Result<HashMap<String, Table>, RunError> {
let mut tables: HashMap<String, Table> = plan
.schema
.iter()
.map(|(name, arity)| (name.clone(), Table::new(*arity)))
.collect();
for (name, rows) in &plan.facts {
let Some(table) = tables.get_mut(name) else {
return Err(RunError::UnknownRelation(name.clone()));
};
for row in rows {
if row.len() != table.arity {
return Err(RunError::ArityMismatch {
relation: name.clone(),
expected: table.arity,
got: row.len(),
});
}
let cells: Vec<Value> = row.iter().cloned().map(Value::from).collect();
table.push(cells);
}
}
Ok(tables)
}
/// Populate a [`Storage`] backend from a [`Plan`]'s schema and facts, then
/// materialize each declared relation back into an in-memory [`Table`] via
/// [`scan_as_table`]. The returned map is the same shape [`execute`]
/// consumes, so this is the storage-backed analogue of [`build_tables`].
///
/// Adding a new backend means constructing a different `S` at the call
/// site; the body here doesn't need to change.
///
/// # Errors
/// Returns [`RunError::UnknownRelation`] or [`RunError::ArityMismatch`] on
/// the same conditions as [`build_tables`], or [`RunError::Storage`] when
/// the backend itself rejects an operation.
pub fn build_tables_via_storage<S: Storage>(
plan: &Plan,
storage: &mut S,
) -> Result<HashMap<String, Table>, RunError> {
for (name, arity) in &plan.schema {
storage.create_relation(name, *arity)?;
}
{
let mut tx = storage.transaction()?;
for (name, rows) in &plan.facts {
let Some(&arity) = plan.schema.get(name) else {
return Err(RunError::UnknownRelation(name.clone()));
};
for row in rows {
if row.len() != arity {
return Err(RunError::ArityMismatch {
relation: name.clone(),
expected: arity,
got: row.len(),
});
}
let cells: Vec<Value> = row.iter().cloned().map(Value::from).collect();
tx.insert(name, cells)?;
}
}
tx.commit()?;
}
let mut tables: HashMap<String, Table> = HashMap::with_capacity(plan.schema.len());
for name in plan.schema.keys() {
let table = scan_as_table(storage as &dyn Storage, name)?;
tables.insert(name.clone(), table);
}
Ok(tables)
}
/// Execute a query DAG against the supplied input tables, returning the
/// bindings [`Relation`] for the rooted plan node.
///
/// Nodes are executed in ascending `id` order. For a Yannakakis plan as
/// emitted by `Geolog.DB.Plan` this is equivalent to a topological sort,
/// since `insertJoin` only references node ids that have already been
/// allocated. A non-monotone id ordering is rejected with
/// [`RunError::UnresolvedDependency`].
///
/// # Errors
/// Returns [`RunError::DuplicateNode`] for repeated ids,
/// [`RunError::MissingNode`] for join references to unknown ids,
/// [`RunError::MissingRoot`] if `query.root` isn't present,
/// [`RunError::MissingTable`] if a scan references a table not in the map,
/// or [`RunError::PatternArityMismatch`] if a scan's pattern doesn't match
/// the table's arity.
pub fn execute<S: std::hash::BuildHasher>(
tables: &HashMap<String, Table, S>,
query: &Query,
) -> Result<Relation, RunError> {
let mut seen_ids: std::collections::HashSet<u32> =
std::collections::HashSet::with_capacity(query.nodes.len());
for node in &query.nodes {
if !seen_ids.insert(node.id) {
return Err(RunError::DuplicateNode(node.id));
}
}
if !seen_ids.contains(&query.root) {
return Err(RunError::MissingRoot(query.root));
}
let mut ordered: Vec<&Node> = query.nodes.iter().collect();
ordered.sort_by_key(|n| n.id);
let mut results: HashMap<u32, Relation> = HashMap::with_capacity(ordered.len());
for node in ordered {
let computed = match &node.action {
Action::Scan(atom) => {
let table = tables
.get(&atom.table)
.ok_or_else(|| RunError::MissingTable(atom.table.clone()))?;
if atom.columns.len() != table.arity {
return Err(RunError::PatternArityMismatch {
table: atom.table.clone(),
table_arity: table.arity,
pattern_arity: atom.columns.len(),
});
}
let pattern = AtomPattern {
columns: atom.columns.iter().cloned().map(Term::from).collect(),
};
scan_atom(table, &pattern)
}
Action::Join(join) => {
let left = require_dep(&results, &seen_ids, node.id, join.left)?;
let right = require_dep(&results, &seen_ids, node.id, join.right)?;
match join.op {
JoinOp::Left => semijoin(left, right),
JoinOp::Right => semijoin(right, left),
JoinOp::Natural => natural_join(left, right),
}
}
};
results.insert(node.id, computed);
}
results
.remove(&query.root)
.ok_or(RunError::MissingRoot(query.root))
}
fn require_dep<'a>(
results: &'a HashMap<u32, Relation>,
seen: &std::collections::HashSet<u32>,
node: u32,
depends_on: u32,
) -> Result<&'a Relation, RunError> {
if let Some(rel) = results.get(&depends_on) {
Ok(rel)
} else if seen.contains(&depends_on) {
Err(RunError::UnresolvedDependency { node, depends_on })
} else {
Err(RunError::MissingNode(depends_on))
}
}
/// Convenience: parse JSON, build tables from the embedded facts, and
/// execute, returning the root binding relation. Equivalent to
/// `parse_plan` + [`build_tables`] + [`execute`].
///
/// # Errors
/// Returns a JSON parse error if the input is malformed, or a [`RunError`]
/// for any later step.
pub fn run_json(json: &str) -> Result<Relation, RunFromJsonError> {
let plan = parse_plan(json).map_err(RunFromJsonError::Parse)?;
let tables = build_tables(&plan).map_err(RunFromJsonError::Run)?;
let bindings = execute(&tables, &plan.query).map_err(RunFromJsonError::Run)?;
Ok(bindings)
}
/// Combined error from [`run_json`].
#[derive(Debug)]
pub enum RunFromJsonError {
Parse(serde_json::Error),
Run(RunError),
}
impl std::fmt::Display for RunFromJsonError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Parse(err) => write!(f, "parse error: {err}"),
Self::Run(err) => write!(f, "run error: {err}"),
}
}
}
impl std::error::Error for RunFromJsonError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::Parse(err) => Some(err),
Self::Run(err) => Some(err),
}
}
}

View File

@ -0,0 +1,225 @@
//! `plan-run` CLI: read a JSON plan from a file (or stdin if `-`), execute
//! it against the chosen backend, and print the resulting binding relation
//! as JSON on stdout.
//!
//! Backends:
//!
//! - `memory` (default): build tables straight from the plan's `facts`
//! block, no `Storage` trait involved. Pure in-memory path.
//! - `memory-storage`: load the same facts through `storage::MemoryStorage`
//! via the `Storage` trait, then materialize tables back out with
//! `scan_as_table` before executing.
//! - `lmdb`, `redb`, `fjall`, `sqlite`: file-backed `Storage` adapters.
//! Each invocation creates a fresh tempdir for the store and drops it on
//! exit; the runner is one-shot, so persistent paths aren't needed.
//! - `geomerge`: CRDT-backed adapter. Constructed in-memory; alpha-status
//! upstream.
use std::collections::HashMap;
use std::io::{self, Read};
use std::process::ExitCode;
use plan_runner::{JsonValue, Plan, build_tables, build_tables_via_storage, execute, parse_plan};
use storage::MemoryStorage;
use storage::adapters::fjall::FjallStorage;
use storage::adapters::geomerge::{ColumnKind, GeomergeStorage};
use storage::adapters::lmdb::LmdbStorage;
use storage::adapters::redb::RedbStorage;
use storage::adapters::sqlite::SqliteStorage;
use storage::table::Table;
use storage::value::Value;
use tempfile::TempDir;
#[derive(Debug, Clone, Copy)]
enum Backend {
Memory,
MemoryStorage,
Lmdb,
Redb,
Fjall,
Sqlite,
Geomerge,
}
impl Backend {
fn parse(s: &str) -> Option<Self> {
match s {
"memory" => Some(Self::Memory),
"memory-storage" => Some(Self::MemoryStorage),
"lmdb" => Some(Self::Lmdb),
"redb" => Some(Self::Redb),
"fjall" => Some(Self::Fjall),
"sqlite" => Some(Self::Sqlite),
"geomerge" => Some(Self::Geomerge),
_ => None,
}
}
}
const BACKENDS_HELP: &str = "memory|memory-storage|lmdb|redb|fjall|sqlite|geomerge";
fn main() -> ExitCode {
let mut backend = Backend::Memory;
let mut input_path: Option<String> = None;
let mut args = std::env::args().skip(1);
while let Some(arg) = args.next() {
match arg.as_str() {
"--backend" => {
let Some(value) = args.next() else {
eprintln!("--backend requires a value ({BACKENDS_HELP})");
return ExitCode::from(2);
};
let Some(parsed) = Backend::parse(&value) else {
eprintln!("unknown backend {value:?} (try {BACKENDS_HELP})");
return ExitCode::from(2);
};
backend = parsed;
}
other if input_path.is_none() => input_path = Some(other.to_string()),
other => {
eprintln!("unexpected argument: {other}");
return ExitCode::from(2);
}
}
}
let Some(path) = input_path else {
eprintln!("usage: plan-run [--backend {BACKENDS_HELP}] <plan.json | ->");
return ExitCode::from(2);
};
let input = match read_input(&path) {
Ok(s) => s,
Err(err) => {
eprintln!("failed to read {path}: {err}");
return ExitCode::from(1);
}
};
let plan = match parse_plan(&input) {
Ok(p) => p,
Err(err) => {
eprintln!("parse error: {err}");
return ExitCode::from(1);
}
};
let tables = match build_tables_for(&plan, backend) {
Ok(t) => t,
Err(err) => {
eprintln!("{err}");
return ExitCode::from(1);
}
};
let relation = match execute(&tables, &plan.query) {
Ok(r) => r,
Err(err) => {
eprintln!("execute error: {err}");
return ExitCode::from(1);
}
};
let payload = serde_json::json!({
"columns": relation.columns,
"rows": relation
.rows
.iter()
.map(|row| row.iter().map(value_to_json).collect::<Vec<_>>())
.collect::<Vec<_>>(),
});
println!("{payload}");
ExitCode::SUCCESS
}
/// Build the input tables for `plan` using `backend`. Path-based adapters
/// allocate a fresh tempdir; it drops at the end of this function, which is
/// safe because `build_tables_via_storage` fully materializes the tables
/// into owned `Vec<Value>` before returning.
fn build_tables_for(plan: &Plan, backend: Backend) -> Result<HashMap<String, Table>, String> {
match backend {
Backend::Memory => build_tables(plan).map_err(|e| format!("build error: {e}")),
Backend::MemoryStorage => {
let mut storage = MemoryStorage::default();
build_tables_via_storage(plan, &mut storage)
.map_err(|e| format!("build error (memory-storage): {e}"))
}
Backend::Lmdb => {
let dir = TempDir::new().map_err(|e| format!("tempdir: {e}"))?;
let mut storage = LmdbStorage::open(dir.path())
.map_err(|e| format!("failed to open lmdb backend: {e}"))?;
build_tables_via_storage(plan, &mut storage)
.map_err(|e| format!("build error (lmdb): {e}"))
}
Backend::Redb => {
let dir = TempDir::new().map_err(|e| format!("tempdir: {e}"))?;
let mut storage = RedbStorage::open(dir.path().join("data.redb"))
.map_err(|e| format!("failed to open redb backend: {e}"))?;
build_tables_via_storage(plan, &mut storage)
.map_err(|e| format!("build error (redb): {e}"))
}
Backend::Fjall => {
let dir = TempDir::new().map_err(|e| format!("tempdir: {e}"))?;
let mut storage = FjallStorage::open(dir.path())
.map_err(|e| format!("failed to open fjall backend: {e}"))?;
build_tables_via_storage(plan, &mut storage)
.map_err(|e| format!("build error (fjall): {e}"))
}
Backend::Sqlite => {
let dir = TempDir::new().map_err(|e| format!("tempdir: {e}"))?;
let mut storage = SqliteStorage::open(dir.path().join("data.sqlite"))
.map_err(|e| format!("failed to open sqlite backend: {e}"))?;
build_tables_via_storage(plan, &mut storage)
.map_err(|e| format!("build error (sqlite): {e}"))
}
Backend::Geomerge => {
let relations = plan
.schema
.iter()
.map(|(name, &arity)| (name.clone(), infer_column_kinds(plan, name, arity)));
let mut storage = GeomergeStorage::with_relations(relations)
.map_err(|e| format!("failed to open geomerge backend: {e}"))?;
build_tables_via_storage(plan, &mut storage)
.map_err(|e| format!("build error (geomerge): {e}"))
}
}
}
/// Best-effort column type inference for `geomerge`'s synthesized theory.
/// The runner IR carries only arity, so we peek at the first fact row of
/// the relation. Columns without a sample default to `String`, which
/// matches every checked-in fixture (entity identities are encoded as
/// strings by the exporter).
fn infer_column_kinds(plan: &Plan, name: &str, arity: usize) -> Vec<ColumnKind> {
let mut kinds = vec![ColumnKind::String; arity];
let Some(rows) = plan.facts.get(name) else {
return kinds;
};
let Some(first) = rows.first() else {
return kinds;
};
for (i, cell) in first.iter().take(arity).enumerate() {
kinds[i] = match cell {
JsonValue::Int(_) => ColumnKind::Int,
JsonValue::Str(_) => ColumnKind::String,
};
}
kinds
}
fn read_input(path: &str) -> io::Result<String> {
if path == "-" {
let mut buf = String::new();
io::stdin().read_to_string(&mut buf)?;
Ok(buf)
} else {
std::fs::read_to_string(path)
}
}
fn value_to_json(value: &Value) -> serde_json::Value {
match value {
Value::Int(n) => serde_json::Value::Number((*n).into()),
Value::Str(s) => serde_json::Value::String(s.clone()),
Value::Id(id) => serde_json::Value::String(id.to_string()),
}
}

View File

@ -0,0 +1,77 @@
//! Walks every JSON fixture under `crates/plan-runner/fixtures/` and
//! verifies it against the `expected_bindings` the exporter lifted from
//! the matching `tools/exporter/examples/*.scenario.json`. A fixture without an oracle
//! is reported as a failure (every checked-in fixture is expected to
//! carry one).
use std::collections::BTreeMap;
use std::fs;
use std::path::PathBuf;
use plan_runner::{parse_plan, run_json, verify};
fn fixtures_dir() -> PathBuf {
PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("fixtures")
}
fn collect_fixtures() -> BTreeMap<String, String> {
let mut out = BTreeMap::new();
for entry in fs::read_dir(fixtures_dir()).expect("read fixtures/") {
let path = entry.expect("dir entry").path();
if path.extension().and_then(|e| e.to_str()) != Some("json") {
continue;
}
let name = path
.file_stem()
.and_then(|s| s.to_str())
.expect("ascii fixture name")
.to_string();
let contents = fs::read_to_string(&path).expect("read fixture");
out.insert(name, contents);
}
out
}
#[test]
fn every_fixture_runs_and_matches_its_oracle() {
let fixtures = collect_fixtures();
assert!(
!fixtures.is_empty(),
"no fixtures found in {}",
fixtures_dir().display()
);
let mut failures: Vec<String> = Vec::new();
for (name, json) in &fixtures {
let plan = match parse_plan(json) {
Ok(p) => p,
Err(err) => {
failures.push(format!("{name}: parse error: {err}"));
continue;
}
};
if plan.expected_bindings.is_none() {
failures.push(format!("{name}: fixture has no expected_bindings"));
continue;
}
let relation = match run_json(json) {
Ok(r) => r,
Err(err) => {
failures.push(format!("{name}: run error: {err}"));
continue;
}
};
match verify(&plan, &relation) {
Ok(true) => {}
Ok(false) => failures.push(format!("{name}: verify returned no-oracle unexpectedly")),
Err(err) => failures.push(format!("{name}: {err}")),
}
}
assert!(
failures.is_empty(),
"{} fixture(s) failed:\n {}",
failures.len(),
failures.join("\n ")
);
}

View File

@ -0,0 +1,52 @@
//! Cross-checks the two paths [`plan-runner`] exposes for materializing
//! input tables: the pure [`build_tables`] path and the [`Storage`]-routed
//! [`build_tables_via_storage`] path. Same fixture, same plan, must agree
//! row-for-row.
//!
//! This is the visible proof of the layer boundary: any new `Storage`
//! backend (LMDB, fjall, geomerge) keeps this test honest by re-running it
//! with a different `S`.
use plan_runner::{build_tables, build_tables_via_storage, execute, parse_plan, run_json};
use storage::MemoryStorage;
use storage::value::Value;
const FIXTURE: &str = include_str!("../fixtures/three_atom_chain.json");
#[test]
fn storage_backed_execution_matches_pure_path() {
let plan = parse_plan(FIXTURE).expect("parse plan");
let pure_tables = build_tables(&plan).expect("build_tables");
let pure = execute(&pure_tables, &plan.query).expect("pure execute");
let mut storage = MemoryStorage::default();
let storage_tables =
build_tables_via_storage(&plan, &mut storage).expect("build_tables_via_storage");
let via_storage = execute(&storage_tables, &plan.query).expect("storage execute");
assert_eq!(pure.columns, via_storage.columns);
// Scan order between MemoryStorage and the direct-from-JSON path isn't
// required to match; compare rows as a multiset. `Value` is not `Ord`
// (it carries `RowId` and `String`), so use a Debug-derived sort key.
assert_eq!(sorted_rows(&pure.rows), sorted_rows(&via_storage.rows));
}
#[test]
fn storage_backed_execution_matches_run_json_oracle() {
let plan = parse_plan(FIXTURE).expect("parse plan");
let oracle = run_json(FIXTURE).expect("run_json");
let mut storage = MemoryStorage::default();
let tables = build_tables_via_storage(&plan, &mut storage).expect("build_tables_via_storage");
let via_storage = execute(&tables, &plan.query).expect("storage execute");
assert_eq!(oracle.columns, via_storage.columns);
assert_eq!(sorted_rows(&oracle.rows), sorted_rows(&via_storage.rows));
}
fn sorted_rows(rows: &[Vec<Value>]) -> Vec<String> {
let mut keys: Vec<String> = rows.iter().map(|r| format!("{r:?}")).collect();
keys.sort();
keys
}

View File

@ -121,7 +121,7 @@ How it works (logically):
<div align="center"> <div align="center">
<picture> <picture>
<img alt="Types" src="docs/diagrams/workflow.svg" height="90%" width="90%%"> <img alt="Workflow" src="docs/diagrams/workflow.svg" height="90%" width="90%">
</picture> </picture>
</div> </div>

View File

@ -2,9 +2,9 @@
//! //!
//! Three operators are in scope: //! Three operators are in scope:
//! //!
//! - [`atom::scan_atom`] scans a [`Table`](storage::table::Table) under //! - [`atom::scan_atom`] scans a [`Table`] under an [`atom::AtomPattern`],
//! an [`atom::AtomPattern`], filtering for repeated-variable equality and //! filtering for repeated-variable equality and literal equality, and
//! literal equality, and outputs a binding [`relation::Relation`]. //! outputs a binding [`relation::Relation`].
//! - [`join::semijoin`] keeps rows of one relation whose shared-column values //! - [`join::semijoin`] keeps rows of one relation whose shared-column values
//! appear in another. //! appear in another.
//! - [`join::natural_join`] combines rows that agree on shared columns, //! - [`join::natural_join`] combines rows that agree on shared columns,
@ -14,10 +14,8 @@
//! is just an expression like //! is just an expression like
//! `natural_join(&semijoin(&a, &b), &scan_atom(&t, &p))`. //! `natural_join(&semijoin(&a, &b), &scan_atom(&t, &p))`.
//! //!
//! Foundational types [`Value`](storage::value::Value) and //! `Value` and `Table` live in the `storage` crate; consumers that build
//! [`Table`](storage::table::Table) live in `storage`, the //! inputs depend on `storage` directly.
//! storage-layer crate this crate is built on; storage backends produce
//! `Table`s that operators here consume.
pub mod atom; pub mod atom;
pub mod join; pub mod join;

View File

@ -24,7 +24,8 @@ This crates helps with decoupling the query execution logic from the underlying
| `adapters::redb::RedbStorage` | struct (feat) | Single-file B-tree backed `Storage`, behind the `redb` feature. Wraps `redb::WriteTransaction` for native atomic commits. | | `adapters::redb::RedbStorage` | struct (feat) | Single-file B-tree backed `Storage`, behind the `redb` feature. Wraps `redb::WriteTransaction` for native atomic commits. |
| `adapters::fjall::FjallStorage` | struct (feat) | LSM-tree backed `Storage`, behind the `fjall` feature. Each relation gets a partition; transactions buffer inserts and apply them on commit. | | `adapters::fjall::FjallStorage` | struct (feat) | LSM-tree backed `Storage`, behind the `fjall` feature. Each relation gets a partition; transactions buffer inserts and apply them on commit. |
| `adapters::lmdb::LmdbStorage` | struct (feat) | mmap'd B-tree backed `Storage`, behind the `lmdb` feature. Wraps `heed`'s `RwTxn` for native atomic commits. | | `adapters::lmdb::LmdbStorage` | struct (feat) | mmap'd B-tree backed `Storage`, behind the `lmdb` feature. Wraps `heed`'s `RwTxn` for native atomic commits. |
| `adapters::geomerge::GeomergeStorage` | struct (feat) | CRDT-backed `Storage` over the workspace's `geomerge` crate, behind the `geomerge` feature. Wraps `geomerge::Transaction` and resolves pending row IDs via `CommittedTx`. Deletion is not supported (append-only log). | | `adapters::geomerge::GeomergeStorage` | struct (feat) | CRDT-backed `Storage` over the workspace's `geomerge` crate, behind the `geomerge` feature. Wraps `geomerge::Transaction` and resolves pending row IDs via `CommittedTx`. Deletion is not supported (append-only log). Construct with `from_theory`, `from_store`, or `with_relations` (synthesizes a theory from `(name, Vec<ColumnKind>)` for callers that lack a typed schema). |
| `adapters::geomerge::ColumnKind` | enum (feat) | Primitive column type fed to `GeomergeStorage::with_relations`: `Int` maps to geomerge `PrimInt`, `String` maps to `PrimString`. Exists so callers can synthesize a theory without depending on `geolog-lang::ir` directly. |
Data types and their relationships: Data types and their relationships:

View File

@ -93,6 +93,17 @@ fn decode_pending_row_id(bytes: &[u8]) -> Result<TempRowId, StorageError> {
} }
/// Geomerge-backed [`Storage`] implementation. /// Geomerge-backed [`Storage`] implementation.
/// Primitive column type used by [`GeomergeStorage::with_relations`] to
/// synthesize a theory from an untyped `(name, arity)` schema. Geomerge
/// supports `PrimInt`, `PrimString`, and entity types; only the two
/// primitives are exposed here, since callers using this constructor by
/// definition don't carry entity-target information.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ColumnKind {
Int,
String,
}
pub struct GeomergeStorage { pub struct GeomergeStorage {
store: Store, store: Store,
declared: HashSet<String>, declared: HashSet<String>,
@ -138,6 +149,52 @@ impl GeomergeStorage {
} }
} }
/// Build a store with a theory synthesized from a flat list of
/// `(relation_name, column_kinds)`. Each `ColumnKind` is mapped to the
/// matching `PrimType`. No entity columns and no laws are declared.
///
/// This is the convenience constructor for callers (e.g., the
/// `plan-runner` CLI) whose schema only carries arity plus a column-by-
/// column primitive-type guess taken from a sample row. It exists so
/// those callers don't have to depend on `geolog-lang::ir` directly.
///
/// # Errors
/// Returns [`StorageError::Backend`] if geomerge rejects the synthesized
/// theory.
pub fn with_relations<I, S>(relations: I) -> Result<Self, StorageError>
where
I: IntoIterator<Item = (S, Vec<ColumnKind>)>,
S: Into<String>,
{
let tables: Vec<ir::TableEntry> = relations
.into_iter()
.map(|(name, kinds)| {
let columns = kinds
.into_iter()
.map(|k| ir::ColType::PrimType {
prim: match k {
ColumnKind::Int => ir::PrimType::PrimInt,
ColumnKind::String => ir::PrimType::PrimString,
},
})
.collect();
let name: String = name.into();
ir::TableEntry {
path: name.into(),
table: ir::Schema {
columns,
primary_key: None,
},
}
})
.collect();
let theory = ir::FlatTheory {
tables,
laws: Vec::new(),
};
Self::from_theory(theory)
}
/// Borrow the underlying geomerge store (for backend-specific operations /// Borrow the underlying geomerge store (for backend-specific operations
/// like persistence, dump, or law inspection that aren't on the trait). /// like persistence, dump, or law inspection that aren't on the trait).
#[must_use] #[must_use]

View File

@ -154,12 +154,12 @@ impl Transaction for LmdbTx<'_> {
let Some(wtxn) = self.wtxn.as_ref() else { let Some(wtxn) = self.wtxn.as_ref() else {
unreachable!("transaction was already committed") unreachable!("transaction was already committed")
}; };
let raw = self let encoded = self
.meta .meta
.get(wtxn, name.as_bytes()) .get(wtxn, name.as_bytes())
.map_err(backend)? .map_err(backend)?
.ok_or_else(|| StorageError::RelationNotFound(name.to_string()))?; .ok_or_else(|| StorageError::RelationNotFound(name.to_string()))?;
let entry = decode_meta(raw)?; let entry = decode_meta(encoded)?;
self.next_ids.insert(name.to_string(), entry); self.next_ids.insert(name.to_string(), entry);
entry entry
}; };

View File

@ -1,4 +1,4 @@
//! SQLite adapter via the `rusqlite` crate (bundled libsqlite3). //! `SQLite` adapter via the `rusqlite` crate (bundled libsqlite3).
//! //!
//! Storage layout: //! Storage layout:
//! //!
@ -35,13 +35,13 @@ CREATE TABLE IF NOT EXISTS __rows (
); );
"; ";
/// SQLite-backed [`Storage`] implementation. /// `SQLite`-backed [`Storage`] implementation.
pub struct SqliteStorage { pub struct SqliteStorage {
conn: Connection, conn: Connection,
} }
impl SqliteStorage { impl SqliteStorage {
/// Open or create a SQLite database at `path`. Pass `":memory:"` for /// Open or create a `SQLite` database at `path`. Pass `":memory:"` for
/// an in-process database (useful in tests). /// an in-process database (useful in tests).
/// ///
/// # Errors /// # Errors

View File

@ -7,7 +7,7 @@
-- against. -- against.
packages: packages:
glog-exporter.cabal plan-exporter.cabal
../../external/geolog/geolog-lang/geolog-lang.cabal ../../external/geolog/geolog-lang/geolog-lang.cabal
../../external/geolog/data-partition/data-partition.cabal ../../external/geolog/data-partition/data-partition.cabal
../../external/geolog/diagnostician/diagnostician.cabal ../../external/geolog/diagnostician/diagnostician.cabal

View File

@ -0,0 +1,31 @@
{
"name": "cartesian",
"_description": "Two disjoint atoms over different tables. Exercises the 'stray' branch of planConjunction's spanning forest (no shared variables = no edge in the intersection graph) and the linear chain of natural-joins that fullJoinForest emits over disconnected components.",
"schema": {
"left": { "columns": [{"entity": "left"}] },
"right": { "columns": [{"entity": "right"}] }
},
"facts": {
"left": [
[{"entity": ["left", 1]}],
[{"entity": ["left", 2]}]
],
"right": [
[{"entity": ["right", 10]}],
[{"entity": ["right", 20]}]
]
},
"atoms": [
{"table": "left", "values": {"0": {"var": "a"}}},
{"table": "right", "values": {"0": {"var": "b"}}}
],
"expected_bindings": {
"columns": ["a", "b"],
"rows": [
[{"entity": ["left", 1]}, {"entity": ["right", 10]}],
[{"entity": ["left", 1]}, {"entity": ["right", 20]}],
[{"entity": ["left", 2]}, {"entity": ["right", 10]}],
[{"entity": ["left", 2]}, {"entity": ["right", 20]}]
]
}
}

View File

@ -0,0 +1,24 @@
{
"name": "self-loop",
"_description": "Single-atom query with a repeated variable across two columns: edge(x, x, _). Exercises evalAtom's equality-enforcement path; the planner emits one PlanEvalAtom node and no joins.",
"schema": {
"edge": { "columns": [{"entity": "node"}, {"entity": "node"}, {"entity": "edge"}] }
},
"facts": {
"edge": [
[{"entity": ["node", 1]}, {"entity": ["node", 2]}, {"entity": ["edge", 1]}],
[{"entity": ["node", 2]}, {"entity": ["node", 2]}, {"entity": ["edge", 2]}],
[{"entity": ["node", 3]}, {"entity": ["node", 3]}, {"entity": ["edge", 3]}]
]
},
"atoms": [
{"table": "edge", "values": {"0": {"var": "x"}, "1": {"var": "x"}}}
],
"expected_bindings": {
"columns": ["x"],
"rows": [
[{"entity": ["node", 2]}],
[{"entity": ["node", 3]}]
]
}
}

View File

@ -0,0 +1,29 @@
{
"name": "three-atom-chain",
"schema": {
"node": { "columns": [{"entity": "node"}] },
"edge": { "columns": [{"entity": "node"}, {"entity": "node"}, {"entity": "edge"}] }
},
"facts": {
"node": [
[{"entity": ["node", 1]}],
[{"entity": ["node", 2]}],
[{"entity": ["node", 3]}]
],
"edge": [
[{"entity": ["node", 1]}, {"entity": ["node", 2]}, {"entity": ["edge", 1]}],
[{"entity": ["node", 2]}, {"entity": ["node", 3]}, {"entity": ["edge", 2]}]
]
},
"atoms": [
{"table": "node", "values": {"0": {"var": "a"}}},
{"table": "edge", "values": {"0": {"var": "a"}, "1": {"var": "b"}}},
{"table": "edge", "values": {"0": {"var": "b"}, "1": {"var": "c"}}}
],
"expected_bindings": {
"columns": ["a", "b", "c"],
"rows": [
[{"entity": ["node", 1]}, {"entity": ["node", 2]}, {"entity": ["node", 3]}]
]
}
}

View File

@ -0,0 +1,28 @@
{
"name": "two-atom-join",
"schema": {
"node": { "columns": [{"entity": "node"}] },
"edge": { "columns": [{"entity": "node"}, {"entity": "node"}, {"entity": "edge"}] }
},
"facts": {
"node": [
[{"entity": ["node", 1]}],
[{"entity": ["node", 2]}]
],
"edge": [
[{"entity": ["node", 1]}, {"entity": ["node", 2]}, {"entity": ["edge", 1]}],
[{"entity": ["node", 2]}, {"entity": ["node", 1]}, {"entity": ["edge", 2]}]
]
},
"atoms": [
{"table": "node", "values": {"0": {"var": "a"}}},
{"table": "edge", "values": {"0": {"var": "a"}, "1": {"var": "b"}}}
],
"expected_bindings": {
"columns": ["a", "b"],
"rows": [
[{"entity": ["node", 1]}, {"entity": ["node", 2]}],
[{"entity": ["node", 2]}, {"entity": ["node", 1]}]
]
}
}

View File

@ -1,19 +1,20 @@
cabal-version: 3.4 cabal-version: 3.4
name: glog-exporter name: plan-exporter
version: 0.1.0.0 version: 0.1.0.0
license: MIT OR Apache-2.0 license: MIT OR Apache-2.0
author: storage-engine-playground author: storage-engine-playground
synopsis: Export geolog-lang join plans as JSON for the Rust runner. synopsis: Export conjunctive-query plans as JSON for the Rust plan-runner.
description: description:
Builds a FlatTheory + facts + a list of QAtoms for a named scenario, Reads a scenario (FlatTheory + facts + a list of QAtoms) from JSON,
runs Geolog.DB.Plan.planConjunction, and emits a JSON document that runs Geolog.DB.Plan.planConjunction, and emits a plan IR JSON document
crates/glog-runner consumes. This allows the playground use query-ops and that crates/plan-runner consumes. The IR is the contract between the
storage end-to-end with a real Yannakakis plan produced by the geolog Haskell frontend and the Rust executor; this tool is currently the only
frontend, not a hand-written fixture. producer, but any frontend that emits the same JSON shape can drive the
runner.
build-type: Simple build-type: Simple
executable glog-export executable plan-export
main-is: Main.hs main-is: Main.hs
hs-source-dirs: src hs-source-dirs: src
default-language: GHC2024 default-language: GHC2024
@ -32,5 +33,6 @@ executable glog-export
, base , base
, bytestring , bytestring
, containers , containers
, fnotation
, geolog-lang , geolog-lang
, text , text

View File

@ -1,31 +1,41 @@
-- | Exports a geolog-lang join plan as JSON for the Rust runner in -- | Reads a @.scenario.json@ example, plans its conjunction with
-- @crates/glog-runner@. -- @Geolog.DB.Plan.planConjunction@, and writes a runner-IR JSON plan that
-- @crates\/plan-runner@ consumes.
-- --
-- Invocation: -- Invocation:
-- --
-- @ -- @
-- cabal run glog-export -- <scenario> > plan.json -- cabal run plan-export -- <scenario.json>
-- @ -- @
-- --
-- Available scenarios: @three-atom-chain@. -- The scenario format is documented in @examples\/README@ or by example
-- (@examples\/*.scenario.json@); the output shape is documented in
-- @crates\/plan-runner\/src\/lib.rs@.
-- --
-- The output shape is documented in @crates\/glog-runner\/src\/lib.rs@. -- The exporter is also a self-check: before emitting, it runs the planned
-- This program is the canonical producer: any change to the IR should -- query through @evalConjunctionPlanned@ and verifies the bindings match
-- start here, with the Rust runner updated to match. -- the scenario's @expected_bindings@. A mismatched scenario fails loudly
-- here rather than handing a bad fixture to the Rust runner.
module Main (main) where module Main (main) where
import Algebra.Graph qualified as AG import Algebra.Graph qualified as AG
import Data.Aeson ((.=)) import Control.Monad (unless)
import Data.Aeson ((.!=), (.:), (.:?), (.=))
import Data.Aeson qualified as Aeson import Data.Aeson qualified as Aeson
import Data.Aeson.Encode.Pretty qualified as AesonPretty import Data.Aeson.Encode.Pretty qualified as AesonPretty
import Data.Aeson.Key qualified as Key import Data.Aeson.Key qualified as Key
import Data.Aeson.KeyMap qualified as KM
import Data.Aeson.Types (Parser)
import Data.ByteString.Lazy.Char8 qualified as LBS8 import Data.ByteString.Lazy.Char8 qualified as LBS8
import Data.Foldable (toList)
import Data.List (sortOn) import Data.List (sortOn)
import Data.Map.Strict (Map) import Data.Map.Strict (Map)
import Data.Map.Strict qualified as Map import Data.Map.Strict qualified as Map
import Data.Set qualified as Set import Data.Set qualified as Set
import Data.Text (Text) import Data.Text (Text)
import Data.Text qualified as T import Data.Text qualified as T
import Data.String (fromString)
import FNotation.Names (Name)
import Geolog.DB.InMemory import Geolog.DB.InMemory
import Geolog.DB.Plan import Geolog.DB.Plan
import Geolog.IR qualified as IR import Geolog.IR qualified as IR
@ -33,74 +43,142 @@ import System.Environment (getArgs)
import System.Exit (die) import System.Exit (die)
import System.IO (hPutStrLn, stderr) import System.IO (hPutStrLn, stderr)
-- * Scenario plumbing -- * Scenario file format
-- --
-- A scenario fixes a schema, a set of ground facts, and a conjunction of -- Mirrors @Geolog.IR.FlatTheory@ + @[(Path, [Val])]@ + @[QAtom]@. The
-- query atoms. The exporter is intentionally code-driven (not @.glog@ -- 'Expected' block is optional but, when present, the exporter cross-
-- driven): @.glog@ files declare theories, not queries, so the query -- checks it against the planner's own evaluation before emitting.
-- side has to live in Haskell either way.
data Scenario = Scenario data Scenario = Scenario
{ scName :: String { scName :: Text
, scTheory :: IR.FlatTheory , scSchema :: Map IR.Path SchemaEntry
, scFacts :: [(IR.Path, [Val])] , scFacts :: [(IR.Path, [Val])]
, scAtoms :: [QAtom] , scAtoms :: [QAtom]
, scExpected :: Maybe Expected
} }
deriving (Show)
-- * three-atom-chain data SchemaEntry = SchemaEntry
-- { seColumns :: [IR.ColType]
-- Mirrors @DB.InMemoryTest@ "matches evalConjunction on three-atom chain". , sePrimaryKey :: Maybe [Int]
-- node = {e1, e2, e3}, edge = {(e1,e2,ee1), (e2,e3,ee2)}. }
-- Conjunction: node(a), edge(a, b, _), edge(b, c, _). deriving (Show)
nodePath, edgePath :: IR.Path data Expected = Expected
nodePath = ["node"] { exColumns :: [Text]
edgePath = ["edge"] , exRows :: [[Val]]
}
deriving (Show)
threeAtomChain :: Scenario -- ** JSON parsers
threeAtomChain =
Scenario parsePath :: Aeson.Value -> Parser IR.Path
{ scName = "three-atom-chain" parsePath = Aeson.withText "path" \t -> pure [nameFromText t]
, scTheory =
IR.FlatTheory -- | Build a single-segment 'Name' from text. Multi-segment names (which
{ tables = -- would carry a non-empty 'init' field) aren't needed by any current
Map.fromList -- example; if a scenario wants @"a/b"@-style paths, extend this helper.
[ (nodePath, IR.Table {columns = [IR.EntityType nodePath], primaryKey = Nothing}) nameFromText :: Text -> Name
, (edgePath, IR.Table {columns = [IR.EntityType nodePath, IR.EntityType nodePath, IR.EntityType edgePath], primaryKey = Nothing}) nameFromText = fromString . T.unpack
]
, laws = Map.empty instance Aeson.FromJSON SchemaEntry where
} parseJSON = Aeson.withObject "SchemaEntry" \o ->
, scFacts = SchemaEntry <$> o .: "columns" <*> o .:? "primaryKey"
[ (nodePath, [ValEntity nodePath 1])
, (nodePath, [ValEntity nodePath 2]) instance Aeson.FromJSON IR.ColType where
, (nodePath, [ValEntity nodePath 3]) parseJSON = Aeson.withObject "ColType" \o -> do
, (edgePath, [ValEntity nodePath 1, ValEntity nodePath 2, ValEntity edgePath 1]) case KM.toList o of
, (edgePath, [ValEntity nodePath 2, ValEntity nodePath 3, ValEntity edgePath 2]) [("entity", v)] -> IR.EntityType <$> parsePath v
] [("prim", v)] -> IR.PrimType <$> parsePrim v
, scAtoms = _ -> fail "ColType: expected {\"entity\": <path>} or {\"prim\": \"int\"|\"string\"}"
[ QAtom {qaTable = nodePath, qaRowId = Nothing, qaValues = Map.singleton 0 (QVar (Var "a"))}
, QAtom {qaTable = edgePath, qaRowId = Nothing, qaValues = Map.fromList [(0, QVar (Var "a")), (1, QVar (Var "b"))]} parsePrim :: Aeson.Value -> Parser IR.PrimType
, QAtom {qaTable = edgePath, qaRowId = Nothing, qaValues = Map.fromList [(0, QVar (Var "b")), (1, QVar (Var "c"))]} parsePrim = Aeson.withText "prim type" \case
] "int" -> pure IR.PrimInt
"string" -> pure IR.PrimString
other -> fail ("unknown primitive type: " <> T.unpack other)
parseVal :: Aeson.Value -> Parser Val
parseVal = Aeson.withObject "Val" \o ->
case KM.toList o of
[("int", v)] -> ValInt <$> Aeson.parseJSON v
[("str", v)] -> ValText <$> Aeson.parseJSON v
[("entity", v)] -> parseEntity v
_ -> fail "Val: expected {\"int\": ..} | {\"str\": ..} | {\"entity\": [<path>, <id>]}"
where
parseEntity = Aeson.withArray "entity" \arr -> case toList arr of
[pv, nv] -> do
p <- parsePath pv
n <- Aeson.parseJSON nv
pure (ValEntity p n)
_ -> fail "entity: expected [<path>, <id>]"
parseQVal :: Aeson.Value -> Parser QVal
parseQVal = Aeson.withObject "QVal" \o ->
case KM.toList o of
[("var", v)] -> QVar . Var <$> Aeson.parseJSON v
[("lit", v)] -> QLit <$> parseVal v
_ -> fail "QVal: expected {\"var\": \"name\"} or {\"lit\": <value>}"
parseAtom :: Aeson.Value -> Parser QAtom
parseAtom = Aeson.withObject "QAtom" \o -> do
qaTable <- o .: "table" >>= parsePath
qaRowId <- o .:? "rowId" >>= traverse parseQVal
values <- o .: "values" :: Parser (Map Text Aeson.Value)
qaValues <-
Map.fromList
<$> traverse
( \(k, v) -> case reads (T.unpack k) of
[(i, "")] -> (i,) <$> parseQVal v
_ -> fail ("non-integer key in atom values: " <> T.unpack k)
)
(Map.toList values)
pure QAtom {qaTable, qaRowId, qaValues}
parseExpected :: Aeson.Value -> Parser Expected
parseExpected = Aeson.withObject "Expected" \o -> do
exColumns <- o .: "columns"
rawRows <- o .: "rows" :: Parser [[Aeson.Value]]
exRows <- traverse (traverse parseVal) rawRows
pure Expected {exColumns, exRows}
instance Aeson.FromJSON Scenario where
parseJSON = Aeson.withObject "Scenario" \o -> do
scName <- o .:? "name" .!= "unnamed"
rawSchema <- o .: "schema" :: Parser (Map Text SchemaEntry)
let scSchema = Map.fromList [([nameFromText k], v) | (k, v) <- Map.toList rawSchema]
rawFacts <- o .:? "facts" .!= mempty :: Parser (Map Text [[Aeson.Value]])
scFacts <-
concat
<$> traverse
( \(name, rows) -> do
let path = [nameFromText name]
parsedRows <- traverse (traverse parseVal) rows
pure [(path, row) | row <- parsedRows]
)
(Map.toList rawFacts)
rawAtoms <- o .: "atoms" :: Parser [Aeson.Value]
scAtoms <- traverse parseAtom rawAtoms
scExpected <- o .:? "expected_bindings" >>= traverse parseExpected
pure Scenario {scName, scSchema, scFacts, scAtoms, scExpected}
-- * Scenario → FlatTheory + DB + atoms
toFlatTheory :: Scenario -> IR.FlatTheory
toFlatTheory sc =
IR.FlatTheory
{ tables = Map.map (\e -> IR.Table {columns = seColumns e, primaryKey = sePrimaryKey e}) sc.scSchema
, laws = Map.empty
} }
scenarios :: [Scenario] populateDB :: Scenario -> DB
scenarios = [threeAtomChain] populateDB sc = foldl (\d (p, row) -> insertRow p row d) (fromTheory (toFlatTheory sc)) sc.scFacts
-- * JSON encoding -- * JSON encoding for the plan-runner IR
-- --
-- The shape mirrors the IR in @crates/glog-runner/src/lib.rs@: -- The shape is the same one we settled on earlier; see
-- -- @crates/plan-runner/src/lib.rs@.
-- > {
-- > "schema": {<name>: <arity>, ...},
-- > "facts": {<name>: [[<value>, ...], ...], ...},
-- > "query": {"root": <id>, "nodes": [{"id": <id>, "action": <action>}, ...]}
-- > }
-- | Render a 'Geolog.IR.Path' (a list of 'FNotation.Names.Name') as a flat
-- string for use as a relation name on the Rust side. Each 'Name' is
-- already shown with @\/@ between its own init segments and last, so we
-- reuse 'show' and join Names with @\/@ too.
pathText :: IR.Path -> Text pathText :: IR.Path -> Text
pathText = T.intercalate "/" . map (T.pack . show) pathText = T.intercalate "/" . map (T.pack . show)
@ -119,10 +197,6 @@ encodeTerm = \case
QVar (Var name) -> Aeson.object ["var" .= name] QVar (Var name) -> Aeson.object ["var" .= name]
QLit v -> Aeson.object ["lit" .= encodeValue v] QLit v -> Aeson.object ["lit" .= encodeValue v]
-- | Flatten an atom into one term per stored column, mirroring
-- @Geolog.DB.InMemory.toFlatArgs@: @qaValues@ keys map to positions
-- @0..n-2@, @qaRowId@ (if present) maps to position @n-1@, and any
-- missing positions become wildcard variables with locally-unique names.
flattenAtom :: Int -> Int -> QAtom -> [Aeson.Value] flattenAtom :: Int -> Int -> QAtom -> [Aeson.Value]
flattenAtom atomIdx arity qa = flattenAtom atomIdx arity qa =
[ encodeTerm (Map.findWithDefault (wildcard atomIdx pos) pos merged) [ encodeTerm (Map.findWithDefault (wildcard atomIdx pos) pos merged)
@ -145,9 +219,6 @@ encodeAtom tables atomIdx qa =
Just t -> length t.columns Just t -> length t.columns
Nothing -> error ("encodeAtom: unknown table " <> show qa.qaTable) Nothing -> error ("encodeAtom: unknown table " <> show qa.qaTable)
-- | Stable atom indexing keyed by atom identity, so the wildcard names in
-- @flattenAtom@ are deterministic across runs even if the planner's node
-- ordering changes.
atomIndex :: [QAtom] -> Map QAtom Int atomIndex :: [QAtom] -> Map QAtom Int
atomIndex atoms = Map.fromList (zip (Set.toList (Set.fromList atoms)) [0 ..]) atomIndex atoms = Map.fromList (zip (Set.toList (Set.fromList atoms)) [0 ..])
@ -176,9 +247,6 @@ encodeNode tables idx n =
] ]
] ]
-- | Render a 'PlanGraph' as the JSON the runner consumes. Empty graphs
-- produce @{"root": 0, "nodes": []}@, which the runner treats as a
-- well-formed but empty query.
encodeQuery :: Map IR.Path IR.Table -> Map QAtom Int -> PlanGraph -> Aeson.Value encodeQuery :: Map IR.Path IR.Table -> Map QAtom Int -> PlanGraph -> Aeson.Value
encodeQuery tables idx (PlanGraph g) encodeQuery tables idx (PlanGraph g)
| null nodes = | null nodes =
@ -192,24 +260,30 @@ encodeQuery tables idx (PlanGraph g)
nodes = sortOn (.graphId.unPlanNodeId) (AG.vertexList g) nodes = sortOn (.graphId.unPlanNodeId) (AG.vertexList g)
rootId = case graphRoot (PlanGraph g) of rootId = case graphRoot (PlanGraph g) of
Just (PlanNodeId i) -> i Just (PlanNodeId i) -> i
-- Non-empty graph with no topological root means a cycle, which
-- planConjunction never produces. Fall back to the last id rather
-- than crashing so a bug here is still inspectable.
Nothing -> (.graphId.unPlanNodeId) (last nodes) Nothing -> (.graphId.unPlanNodeId) (last nodes)
encodeExpected :: Expected -> Aeson.Value
encodeExpected ex =
Aeson.object
[ "columns" .= exColumns ex
, "rows" .= map (map encodeValue) (exRows ex)
]
encodePlan :: Scenario -> Aeson.Value encodePlan :: Scenario -> Aeson.Value
encodePlan sc = encodePlan sc =
Aeson.object Aeson.object
[ "_scenario" .= sc.scName ( [ "_scenario" .= scName sc
, "schema" .= Aeson.object , "schema" .= Aeson.object [pathKey p .= length (seColumns t) | (p, t) <- Map.toList sc.scSchema]
[pathKey p .= length t.columns | (p, t) <- Map.toList sc.scTheory.tables] , "facts"
, "facts" .= Aeson.object .= Aeson.object
[pathKey p .= map (map encodeValue) rows | (p, rows) <- groupedFacts sc.scFacts] [ pathKey p .= map (map encodeValue) rows
, "query" .= encodeQuery sc.scTheory.tables (atomIndex sc.scAtoms) (planConjunction sc.scAtoms) | (p, rows) <- groupedFacts sc.scFacts
] ]
, "query" .= encodeQuery (toFlatTheory sc).tables (atomIndex sc.scAtoms) (planConjunction sc.scAtoms)
]
++ maybe [] (\e -> ["expected_bindings" .= encodeExpected e]) sc.scExpected
)
-- | Group facts by table while preserving table-first-seen order and
-- per-table insertion order.
groupedFacts :: [(IR.Path, [Val])] -> [(IR.Path, [[Val]])] groupedFacts :: [(IR.Path, [Val])] -> [(IR.Path, [[Val]])]
groupedFacts = go [] groupedFacts = go []
where where
@ -222,17 +296,45 @@ groupedFacts = go []
-- * Self-check -- * Self-check
-- --
-- Run the planner's @evalConjunctionPlanned@ against the scenario's DB -- Cross-check the planned bindings against any user-supplied
-- to confirm the plan we're about to emit is well-formed and produces -- 'expected_bindings'. Detects two classes of bug before they reach the
-- non-error output. Catches malformed scenarios before they hand a bad -- Rust side: a scenario whose 'expected' is wrong, and a planner output
-- plan to the Rust runner. -- that disagrees with 'evalConjunction'.
selfCheck :: Scenario -> IO () selfCheck :: Scenario -> IO ()
selfCheck sc = do selfCheck sc = do
let db = foldl (\d (p, row) -> insertRow p row d) (fromTheory sc.scTheory) sc.scFacts let db = populateDB sc
case evalConjunctionPlanned db sc.scAtoms of case evalConjunctionPlanned db sc.scAtoms of
Left err -> die ("self-check failed for " <> sc.scName <> ": " <> show err) Left err -> die ("self-check failed for " <> T.unpack sc.scName <> ": " <> show err)
Right _ -> pure () Right actual -> case sc.scExpected of
Nothing -> pure ()
Just expected -> verifyAgainstExpected sc.scName expected actual
verifyAgainstExpected :: Text -> Expected -> Bindings -> IO ()
verifyAgainstExpected name expected actual = do
let actualCols = actual.cols
expectedCols = Set.fromList (map Var (exColumns expected))
unless (Set.isSubsetOf expectedCols actualCols) $
die $
"self-check failed for "
<> T.unpack name
<> ": expected_bindings names columns not produced by the plan: "
<> show (Set.difference expectedCols actualCols)
let projectedActual = Set.map (`projectOn` exColumns expected) actual.table
expectedProjected = Set.fromList (map (zip (exColumns expected)) (exRows expected))
expectedSet = Set.map (Map.fromList . map (\(v, x) -> (Var v, x))) expectedProjected
unless (projectedActual == expectedSet) $
die $
"self-check failed for "
<> T.unpack name
<> ":\n expected: "
<> show expectedSet
<> "\n actual: "
<> show projectedActual
projectOn :: Map Var Val -> [Text] -> Map Var Val
projectOn row keys =
Map.fromList [(Var k, v) | k <- keys, Just v <- [Map.lookup (Var k) row]]
-- * Entry point -- * Entry point
@ -240,13 +342,13 @@ main :: IO ()
main = do main = do
args <- getArgs args <- getArgs
case args of case args of
[name] -> case lookup name [(s.scName, s) | s <- scenarios] of [path] -> do
Just sc -> do raw <- LBS8.readFile path
selfCheck sc sc <- case Aeson.eitherDecode raw of
LBS8.putStrLn (AesonPretty.encodePretty (encodePlan sc)) Left err -> die ("failed to parse " <> path <> ": " <> err)
Nothing -> Right sc -> pure sc
die ("unknown scenario: " <> name <> "\navailable: " <> unwords (map (.scName) scenarios)) selfCheck sc
LBS8.putStrLn (AesonPretty.encodePretty (encodePlan sc))
_ -> do _ -> do
hPutStrLn stderr "usage: glog-export <scenario>" hPutStrLn stderr "usage: plan-export <scenario.json>"
hPutStrLn stderr ("scenarios: " <> unwords (map (.scName) scenarios))
die "" die ""