diff --git a/.editorconfig b/.editorconfig index 8958945..9dd1efb 100644 --- a/.editorconfig +++ b/.editorconfig @@ -8,7 +8,7 @@ indent_size = 4 insert_final_newline = true trim_trailing_whitespace = true -[*.rs] +[*.{rs,hs,py}] max_line_length = 100 [*.md] diff --git a/AGENTS.md b/AGENTS.md index 44cf7ae..26b91bf 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -50,7 +50,7 @@ Expected durable areas may include: - `src/`: Rust source for parser, catalog, planner, execution experiments, and storage prototypes. - `tests/`: integration tests for rule planning, evaluation, and storage behavior. -- `examples/`: small runnable Datalog-like programs or storage scenarios. +- `tools/exporter/examples/`: hand-authored scenario JSON consumed by the Haskell exporter to produce runner fixtures. - `fixtures/`: committed input facts and expected outputs. - `notes/`: local design notes that belong to this project. - `flowlog/`: project-local notes or sketches derived from the FlowLog line of work. diff --git a/Cargo.lock b/Cargo.lock index 4e72142..00c6fbb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -555,16 +555,6 @@ dependencies = [ "wasip3", ] -[[package]] -name = "glog-runner" -version = "0.1.0" -dependencies = [ - "query-ops", - "serde", - "serde_json", - "storage", -] - [[package]] name = "guardian" version = "1.3.0" @@ -1156,6 +1146,17 @@ version = "0.3.33" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19f132c84eca552bf34cab8ec81f1c1dcc229b811638f9d283dceabe58c5569e" +[[package]] +name = "plan-runner" +version = "0.1.0" +dependencies = [ + "query-ops", + "serde", + "serde_json", + "storage", + "tempfile", +] + [[package]] name = "plotters" version = "0.3.7" diff --git a/Makefile b/Makefile index bb4bb46..3af3df8 100644 --- a/Makefile +++ b/Makefile @@ -77,22 +77,28 @@ clean: ## Remove build output fi EXPORTER_DIR := tools/exporter -EXPORTER_FIXTURES := crates/glog-runner/fixtures -EXPORTER_SCENARIOS := three-atom-chain +EXPORTER_FIXTURES := crates/plan-runner/fixtures +EXAMPLES_DIR := $(EXPORTER_DIR)/examples .PHONY: export-fixtures -export-fixtures: ## Regenerate JSON plan fixtures from the Haskell exporter (needs Cabal and GHC; use `make shell` first). +export-fixtures: ## Regenerate plan JSON for every tools/exporter/examples/*.scenario.json (needs Cabal and GHC; use `make shell` first). @if ! command -v cabal >/dev/null 2>&1; then \ echo "cabal not found. Enter the dev shell with 'make shell' (or 'nix develop') first."; \ exit 1; \ fi - @cd $(EXPORTER_DIR) && cabal build glog-export - @for sc in $(EXPORTER_SCENARIOS); do \ - out=$(EXPORTER_FIXTURES)/$$(echo $$sc | tr '-' '_').json; \ + @cd $(EXPORTER_DIR) && cabal build plan-export + @mkdir -p $(EXPORTER_FIXTURES) + @for sc in $(EXAMPLES_DIR)/*.scenario.json; do \ + base=$$(basename $$sc .scenario.json); \ + out=$(EXPORTER_FIXTURES)/$$base.json; \ echo "exporting $$sc -> $$out"; \ - (cd $(EXPORTER_DIR) && cabal run -v0 glog-export -- $$sc) > $$out; \ + (cd $(EXPORTER_DIR) && cabal run -v0 plan-export -- examples/$$base.scenario.json) > $$out; \ done +.PHONY: examples +examples: export-fixtures ## Regenerate fixtures from scenarios and run them through plan-runner against their oracles. + @cargo test -p plan-runner --test examples + .PHONY: shell shell: ## Enter the Nix dev shell defined in flake.nix @nix develop diff --git a/crates/glog-runner/src/lib.rs b/crates/glog-runner/src/lib.rs deleted file mode 100644 index c7da6e4..0000000 --- a/crates/glog-runner/src/lib.rs +++ /dev/null @@ -1,344 +0,0 @@ -//! End-to-end runner that executes a `geolog-lang` conjunctive-query plan -//! against this workspace's storage and `query-ops` operators. -//! -//! The upstream Haskell planner in `external/geolog/geolog-lang` -//! (`Geolog.DB.Plan`) builds a Yannakakis-style join DAG over `QAtom`s. This -//! crate accepts that DAG as JSON, materializes the input relations through -//! the [`Storage`] trait, and walks the DAG using -//! [`query_ops::atom::scan_atom`], [`query_ops::join::semijoin`], and -//! [`query_ops::join::natural_join`]. The result is a binding -//! [`Relation`](query_ops::relation::Relation) over the query's variables. -//! -//! The JSON IR mirrors `Geolog.DB.Plan.JoinPlan` and `Geolog.DB.InMemory.QAtom` -//! without depending on the Haskell side at build time. A Haskell exporter -//! that dumps `(schema, facts, JoinPlan)` to this shape is the planned -//! follow-up that completes the round trip; the IR is the contract. -//! -//! Mapping from the Haskell planner: -//! -//! | `Geolog.DB.Plan` | this crate | -//! |-----------------------------|-----------------------------------------------| -//! | `PlanEvalAtom` | [`Action::Scan`] → `scan_atom` | -//! | `PlanJoin LeftJoin a b` | [`Action::Join`] with [`JoinOp::Left`] → `semijoin(rel[a], rel[b])` | -//! | `PlanJoin RightJoin a b` | [`Action::Join`] with [`JoinOp::Right`] → `semijoin(rel[b], rel[a])` | -//! | `PlanJoin NaturalJoin a b` | [`Action::Join`] with [`JoinOp::Natural`] → `natural_join(rel[a], rel[b])` | -//! -//! The atom side covers `evalAtom` (`Geolog.DB.InMemory`): a [`Term::Var`] -//! repeated across positions enforces equality, [`Term::Lit`] filters by -//! constant, and distinct variables project in first-occurrence order. - -use std::collections::HashMap; - -use serde::Deserialize; - -use query_ops::atom::{AtomPattern, Term, scan_atom}; -use query_ops::join::{natural_join, semijoin}; -use query_ops::relation::Relation; -use storage::value::Value; -use storage::{MemoryStorage, Storage, StorageError, scan_as_table}; - -/// A single fixture: schema, ground facts, and a query plan to execute. -#[derive(Debug, Clone, Deserialize)] -pub struct Plan { - /// Relation name → arity (column count). - pub schema: HashMap, - /// Relation name → list of ground tuples to insert before execution. - pub facts: HashMap>>, - /// The join DAG itself. - pub query: Query, -} - -/// Mirrors `Geolog.DB.Plan.JoinPlan`: a set of nodes plus the id of the -/// rooted result node. -#[derive(Debug, Clone, Deserialize)] -pub struct Query { - pub root: u32, - pub nodes: Vec, -} - -/// One node of the plan DAG. `id`s are dense within a `Query` but don't need -/// to start at any particular value, mirroring the Haskell `PlanNodeId`. -#[derive(Debug, Clone, Deserialize)] -pub struct Node { - pub id: u32, - pub action: Action, -} - -/// What to compute at a node. Tagged externally so JSON reads as -/// `{"action": {"scan": {...}}}` or `{"action": {"join": {...}}}`. -#[derive(Debug, Clone, Deserialize)] -#[serde(rename_all = "snake_case")] -pub enum Action { - Scan(Atom), - Join(Join), -} - -/// A flat atom pattern, one entry per column of the target relation. -/// Matches the `toFlatArgs` view used by `Geolog.DB.InMemory.evalAtom`: -/// `qaValues` positions are filled in directly, and the entity-id column -/// (if any) appears at the last position. Wildcard positions in the -/// Haskell `QAtom` (a `Map Int QVal` with a missing key) translate to a -/// fresh, unique variable name on this side, which the operator binds but -/// never joins against. -#[derive(Debug, Clone, Deserialize)] -pub struct Atom { - pub table: String, - pub columns: Vec, -} - -#[derive(Debug, Clone, Deserialize)] -#[serde(rename_all = "snake_case")] -pub enum JsonTerm { - Var(String), - Lit(JsonValue), -} - -/// Wire-level value tag. Restricted to what `storage::value::Value` carries. -/// Entity identities from the Haskell side (`ValEntity path id`) round-trip -/// through `Str` for now using a `"path:id"` convention; that's a fixture -/// concern, not a runner concern. -#[derive(Debug, Clone, Deserialize)] -#[serde(rename_all = "snake_case")] -pub enum JsonValue { - Int(i64), - Str(String), -} - -#[derive(Debug, Clone, Deserialize)] -pub struct Join { - pub op: JoinOp, - pub left: u32, - pub right: u32, -} - -#[derive(Debug, Clone, Copy, Deserialize)] -#[serde(rename_all = "snake_case")] -pub enum JoinOp { - /// `Geolog.DB.Plan.LeftJoin`: result is `left` rows whose shared columns - /// appear in `right`. Lowered to `semijoin(left, right)`. - Left, - /// `Geolog.DB.Plan.RightJoin`: result is `right` rows whose shared - /// columns appear in `left`. Lowered to `semijoin(right, left)`. - Right, - /// `Geolog.DB.Plan.NaturalJoin`. Lowered to `natural_join(left, right)`. - Natural, -} - -/// Errors a runner can produce in addition to storage failures. -#[derive(Debug)] -pub enum RunError { - /// A fact references a relation that isn't declared in `schema`. - UnknownRelation(String), - /// A node id appears in a `Join` action but no node with that id exists. - MissingNode(u32), - /// `Query.root` doesn't match any node in `nodes`. - MissingRoot(u32), - /// Two nodes share the same id. - DuplicateNode(u32), - /// A join node references its left or right side before that side has - /// been computed: the DAG isn't actually topologically sorted by id, or - /// it has a cycle. - UnresolvedDependency { node: u32, depends_on: u32 }, - /// Storage layer rejected an operation. - Storage(StorageError), -} - -impl std::fmt::Display for RunError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::UnknownRelation(name) => { - write!(f, "facts reference relation {name:?} not in schema") - } - Self::MissingNode(id) => write!(f, "plan references missing node id {id}"), - Self::MissingRoot(id) => write!(f, "plan root id {id} matches no node"), - Self::DuplicateNode(id) => write!(f, "duplicate node id {id} in plan"), - Self::UnresolvedDependency { node, depends_on } => write!( - f, - "node {node} depends on {depends_on}, which has not been computed yet" - ), - Self::Storage(err) => write!(f, "storage error: {err}"), - } - } -} - -impl std::error::Error for RunError { - fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { - match self { - Self::Storage(err) => Some(err), - _ => None, - } - } -} - -impl From for RunError { - fn from(err: StorageError) -> Self { - Self::Storage(err) - } -} - -impl From for Value { - fn from(jv: JsonValue) -> Self { - match jv { - JsonValue::Int(n) => Self::Int(n), - JsonValue::Str(s) => Self::Str(s), - } - } -} - -impl From for Term { - fn from(t: JsonTerm) -> Self { - match t { - JsonTerm::Var(name) => Self::Var(name), - JsonTerm::Lit(value) => Self::Lit(value.into()), - } - } -} - -/// Parse a [`Plan`] from a JSON string. -/// -/// # Errors -/// Returns a [`serde_json::Error`] if the input isn't valid JSON in the -/// expected shape. -pub fn parse_plan(json: &str) -> Result { - serde_json::from_str(json) -} - -/// Load schema and facts from a [`Plan`] into a fresh [`MemoryStorage`]. -/// -/// All facts are inserted in a single transaction; commit is atomic so a -/// failure on row N leaves the storage empty. -/// -/// # Errors -/// Returns [`RunError::UnknownRelation`] if facts mention a relation not -/// declared in `schema`. Wraps storage failures (arity mismatch, transaction -/// errors) in [`RunError::Storage`]. -pub fn load_into_memory(plan: &Plan) -> Result { - let mut storage = MemoryStorage::default(); - for (name, arity) in &plan.schema { - storage.create_relation(name, *arity)?; - } - { - let mut tx = storage.transaction()?; - for (name, rows) in &plan.facts { - if !plan.schema.contains_key(name) { - return Err(RunError::UnknownRelation(name.clone())); - } - for row in rows { - let cells: Vec = row.iter().cloned().map(Value::from).collect(); - tx.insert(name, cells)?; - } - } - let _ = tx.commit()?; - } - Ok(storage) -} - -/// Execute a plan against a storage backend, returning the bindings -/// [`Relation`] for the rooted plan node. -/// -/// Nodes are executed in ascending `id` order. For a Yannakakis plan as -/// emitted by `Geolog.DB.Plan` this is equivalent to a topological sort, -/// since `insertJoin` only references node ids that have already been -/// allocated. A non-monotone id ordering is rejected with -/// [`RunError::UnresolvedDependency`]. -/// -/// # Errors -/// Returns [`RunError::DuplicateNode`] for repeated ids, -/// [`RunError::MissingNode`] for join references to unknown ids, -/// [`RunError::MissingRoot`] if `query.root` isn't present, and storage -/// errors during the per-scan `scan_as_table` call. -pub fn execute(storage: &S, query: &Query) -> Result { - let mut seen_ids: std::collections::HashSet = - std::collections::HashSet::with_capacity(query.nodes.len()); - for node in &query.nodes { - if !seen_ids.insert(node.id) { - return Err(RunError::DuplicateNode(node.id)); - } - } - if !seen_ids.contains(&query.root) { - return Err(RunError::MissingRoot(query.root)); - } - - let mut ordered: Vec<&Node> = query.nodes.iter().collect(); - ordered.sort_by_key(|n| n.id); - - let mut results: HashMap = HashMap::with_capacity(ordered.len()); - for node in ordered { - let computed = match &node.action { - Action::Scan(atom) => { - let table = scan_as_table(storage, &atom.table)?; - let pattern = AtomPattern { - columns: atom.columns.iter().cloned().map(Term::from).collect(), - }; - scan_atom(&table, &pattern) - } - Action::Join(join) => { - let left = require_dep(&results, &seen_ids, node.id, join.left)?; - let right = require_dep(&results, &seen_ids, node.id, join.right)?; - match join.op { - JoinOp::Left => semijoin(left, right), - JoinOp::Right => semijoin(right, left), - JoinOp::Natural => natural_join(left, right), - } - } - }; - results.insert(node.id, computed); - } - - results - .remove(&query.root) - .ok_or(RunError::MissingRoot(query.root)) -} - -fn require_dep<'a>( - results: &'a HashMap, - seen: &std::collections::HashSet, - node: u32, - depends_on: u32, -) -> Result<&'a Relation, RunError> { - if let Some(rel) = results.get(&depends_on) { - Ok(rel) - } else if seen.contains(&depends_on) { - Err(RunError::UnresolvedDependency { node, depends_on }) - } else { - Err(RunError::MissingNode(depends_on)) - } -} - -/// Convenience: parse JSON, load it into a fresh in-memory storage, and -/// execute, returning the root binding relation. -/// -/// # Errors -/// Returns a JSON parse error if the input is malformed, or a [`RunError`] -/// for any later step. -pub fn run_json(json: &str) -> Result { - let plan = parse_plan(json).map_err(RunFromJsonError::Parse)?; - let storage = load_into_memory(&plan).map_err(RunFromJsonError::Run)?; - let bindings = execute(&storage, &plan.query).map_err(RunFromJsonError::Run)?; - Ok(bindings) -} - -/// Combined error from [`run_json`]. -#[derive(Debug)] -pub enum RunFromJsonError { - Parse(serde_json::Error), - Run(RunError), -} - -impl std::fmt::Display for RunFromJsonError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::Parse(err) => write!(f, "parse error: {err}"), - Self::Run(err) => write!(f, "run error: {err}"), - } - } -} - -impl std::error::Error for RunFromJsonError { - fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { - match self { - Self::Parse(err) => Some(err), - Self::Run(err) => Some(err), - } - } -} diff --git a/crates/glog-runner/src/main.rs b/crates/glog-runner/src/main.rs deleted file mode 100644 index 2c121ec..0000000 --- a/crates/glog-runner/src/main.rs +++ /dev/null @@ -1,59 +0,0 @@ -//! `glog-run` CLI: read a JSON plan from a file (or stdin if `-`), execute -//! it against a fresh in-memory store, and print the resulting binding -//! relation as JSON on stdout. - -use std::io::{self, Read}; -use std::process::ExitCode; - -fn main() -> ExitCode { - let mut args = std::env::args().skip(1); - let Some(path) = args.next() else { - eprintln!("usage: glog-run "); - return ExitCode::from(2); - }; - - let input = match read_input(&path) { - Ok(s) => s, - Err(err) => { - eprintln!("failed to read {path}: {err}"); - return ExitCode::from(1); - } - }; - - let relation = match glog_runner::run_json(&input) { - Ok(r) => r, - Err(err) => { - eprintln!("{err}"); - return ExitCode::from(1); - } - }; - - let payload = serde_json::json!({ - "columns": relation.columns, - "rows": relation - .rows - .iter() - .map(|row| row.iter().map(value_to_json).collect::>()) - .collect::>(), - }); - println!("{payload}"); - ExitCode::SUCCESS -} - -fn read_input(path: &str) -> io::Result { - if path == "-" { - let mut buf = String::new(); - io::stdin().read_to_string(&mut buf)?; - Ok(buf) - } else { - std::fs::read_to_string(path) - } -} - -fn value_to_json(value: &storage::value::Value) -> serde_json::Value { - match value { - storage::value::Value::Int(n) => serde_json::Value::Number((*n).into()), - storage::value::Value::Str(s) => serde_json::Value::String(s.clone()), - storage::value::Value::Id(id) => serde_json::Value::String(id.to_string()), - } -} diff --git a/crates/glog-runner/tests/three_atom_chain.rs b/crates/glog-runner/tests/three_atom_chain.rs deleted file mode 100644 index 9a318ea..0000000 --- a/crates/glog-runner/tests/three_atom_chain.rs +++ /dev/null @@ -1,73 +0,0 @@ -//! End-to-end check: run the JSON fixture and verify the resulting bindings -//! match the `DB.InMemoryTest` "matches evalConjunction on three-atom chain" -//! case from `external/geolog/geolog-lang/test/DB/InMemoryTest.hs`. -//! -//! For `node = {e1, e2, e3}` and `edge = {(e1,e2,ee1), (e2,e3,ee2)}` the -//! conjunction `node(a), edge(a, b, _), edge(b, c, _)` has exactly one -//! solution: `(a=e1, b=e2, c=e3)`. - -use std::collections::BTreeMap; - -use glog_runner::run_json; -use storage::value::Value; - -fn fixture() -> &'static str { - include_str!("../fixtures/three_atom_chain.json") -} - -fn ent(path: &str, id: u32) -> Value { - Value::Str(format!("{path}:{id}")) -} - -fn project<'a>( - columns: &'a [String], - row: &'a [Value], - keep: &'a [&'a str], -) -> BTreeMap<&'a str, &'a Value> { - keep.iter() - .map(|name| { - let pos = columns - .iter() - .position(|c| c == name) - .expect("column missing"); - (*name, &row[pos]) - }) - .collect() -} - -#[test] -fn three_atom_chain_matches_haskell_oracle() { - let result = run_json(fixture()).expect("fixture should execute"); - - // The plan's root keeps every variable, including the per-atom wildcards - // `_r1` and `_r2`. The oracle only asserts the (a, b, c) projection. - let keep = ["a", "b", "c"]; - let mut projected: Vec> = result - .rows - .iter() - .map(|row| project(&result.columns, row, &keep)) - .collect(); - projected.sort_by_key(|m| format!("{m:?}")); - - let e1 = ent("node", 1); - let e2 = ent("node", 2); - let e3 = ent("node", 3); - let expected = vec![BTreeMap::from([("a", &e1), ("b", &e2), ("c", &e3)])]; - - assert_eq!(projected, expected); -} - -#[test] -fn root_columns_cover_a_b_c_plus_two_wildcards() { - // The exporter emits unique wildcard variable names for the entity-id - // column of each edge atom (e.g. `_w0_2`, `_w1_2`); their exact spelling - // is an implementation detail of the exporter, so this test only checks - // that the named variables are all present and that the total column - // count is the three named ones plus two anonymous wildcards. - let result = run_json(fixture()).expect("fixture should execute"); - let cols: std::collections::HashSet<&str> = result.columns.iter().map(String::as_str).collect(); - for expected in ["a", "b", "c"] { - assert!(cols.contains(expected), "missing column {expected}"); - } - assert_eq!(result.columns.len(), 5, "expected 3 named + 2 wildcards"); -} diff --git a/crates/glog-runner/Cargo.toml b/crates/plan-runner/Cargo.toml similarity index 62% rename from crates/glog-runner/Cargo.toml rename to crates/plan-runner/Cargo.toml index 29b75d5..5663f3f 100644 --- a/crates/glog-runner/Cargo.toml +++ b/crates/plan-runner/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "glog-runner" +name = "plan-runner" version = "0.1.0" edition.workspace = true license.workspace = true @@ -9,11 +9,18 @@ rust-version.workspace = true workspace = true [dependencies] -storage = { path = "../storage" } query-ops = { path = "../query-ops" } +storage = { path = "../storage", features = [ + "lmdb", + "redb", + "fjall", + "sqlite", + "geomerge", +] } serde = { version = "1", features = ["derive"] } serde_json = "1" +tempfile = "3" [[bin]] -name = "glog-run" +name = "plan-run" path = "src/main.rs" diff --git a/crates/plan-runner/README.md b/crates/plan-runner/README.md new file mode 100644 index 0000000..6c0c463 --- /dev/null +++ b/crates/plan-runner/README.md @@ -0,0 +1,122 @@ +## Plan Runner + +This crate is a snapshot executor for conjunctive-query plans. +It reads a JSON plan (a DAG of scan and join nodes plus the input facts), +walks the DAG using the operators from [`query-ops`](../query-ops), +and prints the binding relation produced at the root node. + +The wire format mirrors `Geolog.DB.Plan.PlanGraph` from the +[`geolog`](../../external/geolog) submodule, but the JSON shape is the contract: +any frontend that emits this format can drive the runner. +The mapping from `PlanEvalAtom` / `PlanJoin` to `scan_atom` / `semijoin` / `natural_join`, +and the full IR spec, are documented as module-level rustdoc in +[`src/lib.rs`](src/lib.rs). + +### Pipeline + +End-to-end, scenarios become runner output through three stages: + +```text +tools/exporter/examples/*.scenario.json + └── (Haskell exporter; runs Geolog.DB.Plan.planConjunction + and Geolog.DB.InMemory.evalConjunctionPlanned as a self-check) + └── crates/plan-runner/fixtures/*.json (JSON IR; checked in) + └── (plan-runner; this crate) + └── stdout JSON, with row-for-row oracle check +``` + +The exporter (`tools/exporter`) is the only producer of runner IR today; +it's where atoms are planned and rejected if they don't fit the supported subset. +Fixtures are regenerated with `make export-fixtures`, and the full loop is `make examples`. + +### Backends + +The CLI takes a `--backend` flag. +The `memory` backend is the pure in-memory path; +every other backend routes facts through the [`Storage`](../storage) trait +via `build_tables_via_storage`, then scans tables back out before executing. + +| Backend | Storage | Location | +|------------------|--------------------------------------------------|--------------------------------| +| `memory` | none (direct from `plan.facts`) | n/a | +| `memory-storage` | `MemoryStorage` | in-process | +| `lmdb` | `LmdbStorage` (heed-backed mmap B-tree) | fresh tempdir per run | +| `redb` | `RedbStorage` (single-file B-tree) | fresh tempdir per run | +| `fjall` | `FjallStorage` (LSM tree) | fresh tempdir per run | +| `sqlite` | `SqliteStorage` (rusqlite, bundled libsqlite3) | fresh tempdir per run | +| `geomerge` | `GeomergeStorage` (CRDT; alpha) | in-process | + +All seven produce byte-identical output for every checked-in fixture. +The point of the abstraction is not performance comparison +(the snapshot evaluator is bulk-materialized either way), +but to validate that the storage layer is genuinely backend-neutral +and that adding a new adapter is a constructor swap. + +Note on `geomerge`: +the runner's JSON IR is untyped (only arity per relation), +but geomerge requires a typed theory upfront. +The CLI infers column types from the first fact row per relation +and synthesizes a theory of `PrimInt` and `PrimString` columns via +[`GeomergeStorage::with_relations`](../storage/src/adapters/geomerge.rs). +Columns with no sample facts default to `PrimString`. + +### Run It + +```sh +# Run one fixture through the default in-memory path: +cargo run -p plan-runner -- crates/plan-runner/fixtures/two_atom_join.json + +# Same plan, routed through different backends: +cargo run -p plan-runner -- --backend memory-storage crates/plan-runner/fixtures/two_atom_join.json +cargo run -p plan-runner -- --backend lmdb crates/plan-runner/fixtures/two_atom_join.json +cargo run -p plan-runner -- --backend redb crates/plan-runner/fixtures/two_atom_join.json +cargo run -p plan-runner -- --backend fjall crates/plan-runner/fixtures/two_atom_join.json +cargo run -p plan-runner -- --backend sqlite crates/plan-runner/fixtures/two_atom_join.json +cargo run -p plan-runner -- --backend geomerge crates/plan-runner/fixtures/two_atom_join.json + +# Regenerate every fixture from its scenario and run the oracle test: +make examples +``` + +A sample run: + +```sh +$ plan-run crates/plan-runner/fixtures/two_atom_join.json +{"columns":["a","b","_w0_2"],"rows":[["node:1","node:2","edge:1"],["node:2","node:1","edge:2"]]} +``` + +The `_w_` columns are wildcards the exporter named so the runner can bind them. +The scenario's `expected_bindings` block names only the variables the test cares about, +and `verify` projects the runner output to that subset before comparing as a multiset. + +### Run the Tests + +```sh +cargo test -p plan-runner +``` + +The two integration test files exercise complementary properties: + +- `tests/examples.rs` walks every fixture and checks it against its `expected_bindings` oracle. +- `tests/storage_roundtrip.rs` cross-checks the pure path against the storage-backed path, + to keep `build_tables` and `build_tables_via_storage` in lockstep. + +### Notes + +- **IR contract.** + The runner is backend-agnostic and frontend-agnostic: + it consumes JSON in the shape documented in `src/lib.rs` and produces a binding relation. + Anything that emits the same JSON can drive it. +- **No optimizer.** + Plans are executed as written. + Node ordering, join shape, and antijoin scheduling are all the producer's responsibility. + This crate's job ends at faithful execution of the IR. +- **Wildcard columns survive.** + `scan_atom` keeps every distinct variable that appears in the pattern, + including the exporter's synthetic `_w_` names. + The runner does not project them out; + oracle verification handles that on the comparison side. +- **Bulk, not streaming.** + Each node materializes its full output as a `Relation`. + This matches `query-ops`' execution model; + it's not designed for incremental or maintained-view workloads. diff --git a/crates/plan-runner/fixtures/cartesian.json b/crates/plan-runner/fixtures/cartesian.json new file mode 100644 index 0000000..0f8a766 --- /dev/null +++ b/crates/plan-runner/fixtures/cartesian.json @@ -0,0 +1,114 @@ +{ + "_scenario": "cartesian", + "expected_bindings": { + "columns": [ + "a", + "b" + ], + "rows": [ + [ + { + "str": "left:1" + }, + { + "str": "right:10" + } + ], + [ + { + "str": "left:1" + }, + { + "str": "right:20" + } + ], + [ + { + "str": "left:2" + }, + { + "str": "right:10" + } + ], + [ + { + "str": "left:2" + }, + { + "str": "right:20" + } + ] + ] + }, + "facts": { + "left": [ + [ + { + "str": "left:1" + } + ], + [ + { + "str": "left:2" + } + ] + ], + "right": [ + [ + { + "str": "right:10" + } + ], + [ + { + "str": "right:20" + } + ] + ] + }, + "query": { + "nodes": [ + { + "action": { + "scan": { + "columns": [ + { + "var": "a" + } + ], + "table": "left" + } + }, + "id": 1 + }, + { + "action": { + "scan": { + "columns": [ + { + "var": "b" + } + ], + "table": "right" + } + }, + "id": 2 + }, + { + "action": { + "join": { + "left": 1, + "op": "natural", + "right": 2 + } + }, + "id": 3 + } + ], + "root": 3 + }, + "schema": { + "left": 1, + "right": 1 + } +} diff --git a/crates/plan-runner/fixtures/self_loop.json b/crates/plan-runner/fixtures/self_loop.json new file mode 100644 index 0000000..7bcffef --- /dev/null +++ b/crates/plan-runner/fixtures/self_loop.json @@ -0,0 +1,84 @@ +{ + "_scenario": "self-loop", + "expected_bindings": { + "columns": [ + "x" + ], + "rows": [ + [ + { + "str": "node:2" + } + ], + [ + { + "str": "node:3" + } + ] + ] + }, + "facts": { + "edge": [ + [ + { + "str": "node:1" + }, + { + "str": "node:2" + }, + { + "str": "edge:1" + } + ], + [ + { + "str": "node:2" + }, + { + "str": "node:2" + }, + { + "str": "edge:2" + } + ], + [ + { + "str": "node:3" + }, + { + "str": "node:3" + }, + { + "str": "edge:3" + } + ] + ] + }, + "query": { + "nodes": [ + { + "action": { + "scan": { + "columns": [ + { + "var": "x" + }, + { + "var": "x" + }, + { + "var": "_w0_2" + } + ], + "table": "edge" + } + }, + "id": 1 + } + ], + "root": 1 + }, + "schema": { + "edge": 3 + } +} diff --git a/crates/glog-runner/fixtures/three_atom_chain.json b/crates/plan-runner/fixtures/three_atom_chain.json similarity index 91% rename from crates/glog-runner/fixtures/three_atom_chain.json rename to crates/plan-runner/fixtures/three_atom_chain.json index c8d3f7e..91a0b24 100644 --- a/crates/glog-runner/fixtures/three_atom_chain.json +++ b/crates/plan-runner/fixtures/three_atom_chain.json @@ -1,5 +1,25 @@ { "_scenario": "three-atom-chain", + "expected_bindings": { + "columns": [ + "a", + "b", + "c" + ], + "rows": [ + [ + { + "str": "node:1" + }, + { + "str": "node:2" + }, + { + "str": "node:3" + } + ] + ] + }, "facts": { "edge": [ [ diff --git a/crates/plan-runner/fixtures/two_atom_join.json b/crates/plan-runner/fixtures/two_atom_join.json new file mode 100644 index 0000000..5b7a63e --- /dev/null +++ b/crates/plan-runner/fixtures/two_atom_join.json @@ -0,0 +1,136 @@ +{ + "_scenario": "two-atom-join", + "expected_bindings": { + "columns": [ + "a", + "b" + ], + "rows": [ + [ + { + "str": "node:1" + }, + { + "str": "node:2" + } + ], + [ + { + "str": "node:2" + }, + { + "str": "node:1" + } + ] + ] + }, + "facts": { + "edge": [ + [ + { + "str": "node:1" + }, + { + "str": "node:2" + }, + { + "str": "edge:1" + } + ], + [ + { + "str": "node:2" + }, + { + "str": "node:1" + }, + { + "str": "edge:2" + } + ] + ], + "node": [ + [ + { + "str": "node:1" + } + ], + [ + { + "str": "node:2" + } + ] + ] + }, + "query": { + "nodes": [ + { + "action": { + "scan": { + "columns": [ + { + "var": "a" + }, + { + "var": "b" + }, + { + "var": "_w0_2" + } + ], + "table": "edge" + } + }, + "id": 1 + }, + { + "action": { + "scan": { + "columns": [ + { + "var": "a" + } + ], + "table": "node" + } + }, + "id": 2 + }, + { + "action": { + "join": { + "left": 1, + "op": "left", + "right": 2 + } + }, + "id": 3 + }, + { + "action": { + "join": { + "left": 3, + "op": "right", + "right": 2 + } + }, + "id": 4 + }, + { + "action": { + "join": { + "left": 3, + "op": "natural", + "right": 4 + } + }, + "id": 5 + } + ], + "root": 5 + }, + "schema": { + "edge": 3, + "node": 1 + } +} diff --git a/crates/plan-runner/src/lib.rs b/crates/plan-runner/src/lib.rs new file mode 100644 index 0000000..7cdcf28 --- /dev/null +++ b/crates/plan-runner/src/lib.rs @@ -0,0 +1,540 @@ +//! Snapshot executor for conjunctive-query plans. +//! +//! Takes a structural plan (a DAG of `Scan` and `Join` nodes), the input +//! tables it scans, and walks the DAG via [`query_ops::atom::scan_atom`], +//! [`query_ops::join::semijoin`], and [`query_ops::join::natural_join`]. +//! The result is a binding [`Relation`](query_ops::relation::Relation) over +//! the query's variables. +//! +//! The runner is intentionally backend-agnostic: it depends only on +//! `query-ops`, and the planner that emits the JSON IR is decoupled from +//! the storage backend that produced the facts. To execute a plan against +//! a [`Storage`](storage::Storage) backend, materialize each input table +//! with [`storage::scan_as_table`] and call [`execute`] with the resulting +//! map. The in-tree `tests/storage_roundtrip.rs` is the canonical example. +//! +//! The JSON IR mirrors `Geolog.DB.Plan.PlanGraph` and +//! `Geolog.DB.InMemory.QAtom` from the `external/geolog` submodule, but the +//! shape is the contract: any frontend that emits this JSON can use the +//! runner. +//! +//! Operator mapping from the Haskell planner: +//! +//! | `Geolog.DB.Plan` | this crate | +//! |-----------------------------|-----------------------------------------------| +//! | `PlanEvalAtom` | [`Action::Scan`] → `scan_atom` | +//! | `PlanJoin LeftJoin a b` | [`Action::Join`] with [`JoinOp::Left`] → `semijoin(rel[a], rel[b])` | +//! | `PlanJoin RightJoin a b` | [`Action::Join`] with [`JoinOp::Right`] → `semijoin(rel[b], rel[a])` | +//! | `PlanJoin NaturalJoin a b` | [`Action::Join`] with [`JoinOp::Natural`] → `natural_join(rel[a], rel[b])` | +//! +//! The atom side covers `evalAtom` (`Geolog.DB.InMemory`): a [`Term::Var`] +//! repeated across positions enforces equality, [`Term::Lit`] filters by +//! constant, and distinct variables project in first-occurrence order. + +use std::collections::HashMap; + +use serde::Deserialize; + +use query_ops::atom::{AtomPattern, Term, scan_atom}; +use query_ops::join::{natural_join, semijoin}; +use query_ops::relation::Relation; +use storage::table::Table; +use storage::value::Value; +use storage::{Storage, StorageError, scan_as_table}; + +/// A single fixture: schema, ground facts, and a query plan to execute. +#[derive(Debug, Clone, Deserialize)] +pub struct Plan { + /// Relation name → arity (column count). + pub schema: HashMap, + /// Relation name → list of ground tuples to insert before execution. + pub facts: HashMap>>, + /// The join DAG itself. + pub query: Query, + /// Optional oracle: if present, [`verify`] cross-checks an executed + /// [`Relation`] against this projection. The exporter lifts the + /// scenario's `expected_bindings` block into this field. + #[serde(default)] + pub expected_bindings: Option, +} + +/// Expected query result, projected to a named subset of variables. The +/// columns named here must all appear in the runner's output; any extra +/// columns (typically per-atom wildcards) are ignored. +#[derive(Debug, Clone, Deserialize)] +pub struct ExpectedBindings { + pub columns: Vec, + pub rows: Vec>, +} + +/// Mirrors `Geolog.DB.Plan.PlanGraph`: a set of nodes plus the id of the +/// rooted result node (the last node in topological order). +#[derive(Debug, Clone, Deserialize)] +pub struct Query { + pub root: u32, + pub nodes: Vec, +} + +/// One node of the plan DAG. `id`s don't need to start at any particular +/// value, mirroring the Haskell `PlanNodeId`. +#[derive(Debug, Clone, Deserialize)] +pub struct Node { + pub id: u32, + pub action: Action, +} + +/// What to compute at a node. Tagged externally so JSON reads as +/// `{"action": {"scan": {...}}}` or `{"action": {"join": {...}}}`. +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum Action { + Scan(Atom), + Join(Join), +} + +/// A flat atom pattern, one entry per column of the target relation. +/// Matches the `toFlatArgs` view used by `Geolog.DB.InMemory.evalAtom`: +/// `qaValues` positions are filled in directly, and the entity-id column +/// (if any) appears at the last position. Wildcard positions in the +/// Haskell `QAtom` (a `Map Int QVal` with a missing key) translate to a +/// fresh, unique variable name on this side, which the operator binds but +/// never joins against. +#[derive(Debug, Clone, Deserialize)] +pub struct Atom { + pub table: String, + pub columns: Vec, +} + +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum JsonTerm { + Var(String), + Lit(JsonValue), +} + +/// Wire-level value tag. Restricted to what +/// [`storage::value::Value`](storage::value::Value) carries. Entity identities from +/// the Haskell side (`ValEntity path id`) round-trip through `Str` using a +/// `"path:id"` convention; that's a fixture concern, not a runner concern. +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum JsonValue { + Int(i64), + Str(String), +} + +#[derive(Debug, Clone, Deserialize)] +pub struct Join { + pub op: JoinOp, + pub left: u32, + pub right: u32, +} + +#[derive(Debug, Clone, Copy, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum JoinOp { + /// `Geolog.DB.Plan.LeftJoin`: result is `left` rows whose shared columns + /// appear in `right`. Lowered to `semijoin(left, right)`. + Left, + /// `Geolog.DB.Plan.RightJoin`: result is `right` rows whose shared + /// columns appear in `left`. Lowered to `semijoin(right, left)`. + Right, + /// `Geolog.DB.Plan.NaturalJoin`. Lowered to `natural_join(left, right)`. + Natural, +} + +/// Errors produced by [`verify`] when actual bindings don't match the +/// scenario's `expected_bindings` projection. +#[derive(Debug)] +pub enum VerifyError { + /// An expected column wasn't produced by the plan. + MissingColumn(String), + /// An expected row's width didn't match the column count. + ExpectedRowArity { expected: usize, got: usize }, + /// The expected and actual rows (after projection) differ as multisets. + BindingsMismatch { + expected: Vec>, + actual: Vec>, + }, +} + +impl std::fmt::Display for VerifyError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::MissingColumn(name) => { + write!(f, "expected column {name:?} not in plan output") + } + Self::ExpectedRowArity { expected, got } => write!( + f, + "expected row has {got} cells but columns has {expected} entries" + ), + Self::BindingsMismatch { expected, actual } => write!( + f, + "bindings mismatch:\n expected: {expected:?}\n actual: {actual:?}" + ), + } + } +} + +impl std::error::Error for VerifyError {} + +/// Cross-check an executed [`Relation`] against a [`Plan`]'s +/// `expected_bindings`. Projects `actual` to the expected columns (so the +/// runner is free to surface wildcard columns the oracle doesn't name) and +/// compares as a multiset. +/// +/// Returns `Ok(true)` if the plan carried an oracle and it matched, +/// `Ok(false)` if there was no oracle (caller decides whether that's an +/// error). Returns [`VerifyError`] on mismatch. +/// +/// # Errors +/// See [`VerifyError`]. +pub fn verify(plan: &Plan, actual: &Relation) -> Result { + let Some(expected) = &plan.expected_bindings else { + return Ok(false); + }; + let mut projection: Vec = Vec::with_capacity(expected.columns.len()); + for col in &expected.columns { + let idx = actual + .columns + .iter() + .position(|c| c == col) + .ok_or_else(|| VerifyError::MissingColumn(col.clone()))?; + projection.push(idx); + } + let mut actual_proj: Vec> = actual + .rows + .iter() + .map(|row| projection.iter().map(|&i| row[i].clone()).collect()) + .collect(); + let mut expected_proj: Vec> = Vec::with_capacity(expected.rows.len()); + for row in &expected.rows { + if row.len() != expected.columns.len() { + return Err(VerifyError::ExpectedRowArity { + expected: expected.columns.len(), + got: row.len(), + }); + } + expected_proj.push(row.iter().cloned().map(Value::from).collect()); + } + // Value is not Ord; use Debug-derived sort keys to compare as a multiset. + let key = |row: &[Value]| -> String { format!("{row:?}") }; + actual_proj.sort_by_key(|r| key(r)); + expected_proj.sort_by_key(|r| key(r)); + if actual_proj == expected_proj { + Ok(true) + } else { + Err(VerifyError::BindingsMismatch { + expected: expected_proj, + actual: actual_proj, + }) + } +} + +/// Errors a runner can produce during plan validation and execution. +#[derive(Debug)] +pub enum RunError { + /// A fact or scan references a relation that isn't declared in `schema`. + UnknownRelation(String), + /// A scan refers to a table that wasn't supplied in the input map. + MissingTable(String), + /// A fact row's length doesn't match the schema's declared arity. + ArityMismatch { + relation: String, + expected: usize, + got: usize, + }, + /// A scan's atom pattern doesn't match the table's arity. + PatternArityMismatch { + table: String, + table_arity: usize, + pattern_arity: usize, + }, + /// A join node references a node id that doesn't exist. + MissingNode(u32), + /// `Query.root` doesn't match any node in `nodes`. + MissingRoot(u32), + /// Two nodes share the same id. + DuplicateNode(u32), + /// A join node references its left or right side before that side has + /// been computed: the DAG isn't actually topologically sorted by id, or + /// it has a cycle. + UnresolvedDependency { node: u32, depends_on: u32 }, + /// A [`Storage`] backend used to materialize tables returned an error. + Storage(StorageError), +} + +impl From for RunError { + fn from(err: StorageError) -> Self { + Self::Storage(err) + } +} + +impl std::fmt::Display for RunError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::UnknownRelation(name) => { + write!(f, "facts reference relation {name:?} not in schema") + } + Self::MissingTable(name) => write!(f, "scan references missing table {name:?}"), + Self::ArityMismatch { + relation, + expected, + got, + } => write!( + f, + "relation {relation:?}: row arity {got} differs from schema arity {expected}" + ), + Self::PatternArityMismatch { + table, + table_arity, + pattern_arity, + } => write!( + f, + "scan of {table:?}: pattern has {pattern_arity} columns, table has {table_arity}" + ), + Self::MissingNode(id) => write!(f, "plan references missing node id {id}"), + Self::MissingRoot(id) => write!(f, "plan root id {id} matches no node"), + Self::DuplicateNode(id) => write!(f, "duplicate node id {id} in plan"), + Self::UnresolvedDependency { node, depends_on } => write!( + f, + "node {node} depends on {depends_on}, which has not been computed yet" + ), + Self::Storage(err) => write!(f, "storage backend error: {err}"), + } + } +} + +impl std::error::Error for RunError {} + +impl From for Value { + fn from(jv: JsonValue) -> Self { + match jv { + JsonValue::Int(n) => Self::Int(n), + JsonValue::Str(s) => Self::Str(s), + } + } +} + +impl From for Term { + fn from(t: JsonTerm) -> Self { + match t { + JsonTerm::Var(name) => Self::Var(name), + JsonTerm::Lit(value) => Self::Lit(value.into()), + } + } +} + +/// Parse a [`Plan`] from a JSON string. +/// +/// # Errors +/// Returns a [`serde_json::Error`] if the input isn't valid JSON in the +/// expected shape. +pub fn parse_plan(json: &str) -> Result { + serde_json::from_str(json) +} + +/// Build the input [`Table`] for each relation declared in a [`Plan`]'s +/// schema, populating rows from the plan's `facts` map. Relations with no +/// facts get an empty table at the declared arity. +/// +/// # Errors +/// Returns [`RunError::UnknownRelation`] if `facts` mentions a relation +/// not in `schema`, or [`RunError::ArityMismatch`] if a row's width doesn't +/// match the declared arity. +pub fn build_tables(plan: &Plan) -> Result, RunError> { + let mut tables: HashMap = plan + .schema + .iter() + .map(|(name, arity)| (name.clone(), Table::new(*arity))) + .collect(); + for (name, rows) in &plan.facts { + let Some(table) = tables.get_mut(name) else { + return Err(RunError::UnknownRelation(name.clone())); + }; + for row in rows { + if row.len() != table.arity { + return Err(RunError::ArityMismatch { + relation: name.clone(), + expected: table.arity, + got: row.len(), + }); + } + let cells: Vec = row.iter().cloned().map(Value::from).collect(); + table.push(cells); + } + } + Ok(tables) +} + +/// Populate a [`Storage`] backend from a [`Plan`]'s schema and facts, then +/// materialize each declared relation back into an in-memory [`Table`] via +/// [`scan_as_table`]. The returned map is the same shape [`execute`] +/// consumes, so this is the storage-backed analogue of [`build_tables`]. +/// +/// Adding a new backend means constructing a different `S` at the call +/// site; the body here doesn't need to change. +/// +/// # Errors +/// Returns [`RunError::UnknownRelation`] or [`RunError::ArityMismatch`] on +/// the same conditions as [`build_tables`], or [`RunError::Storage`] when +/// the backend itself rejects an operation. +pub fn build_tables_via_storage( + plan: &Plan, + storage: &mut S, +) -> Result, RunError> { + for (name, arity) in &plan.schema { + storage.create_relation(name, *arity)?; + } + { + let mut tx = storage.transaction()?; + for (name, rows) in &plan.facts { + let Some(&arity) = plan.schema.get(name) else { + return Err(RunError::UnknownRelation(name.clone())); + }; + for row in rows { + if row.len() != arity { + return Err(RunError::ArityMismatch { + relation: name.clone(), + expected: arity, + got: row.len(), + }); + } + let cells: Vec = row.iter().cloned().map(Value::from).collect(); + tx.insert(name, cells)?; + } + } + tx.commit()?; + } + let mut tables: HashMap = HashMap::with_capacity(plan.schema.len()); + for name in plan.schema.keys() { + let table = scan_as_table(storage as &dyn Storage, name)?; + tables.insert(name.clone(), table); + } + Ok(tables) +} + +/// Execute a query DAG against the supplied input tables, returning the +/// bindings [`Relation`] for the rooted plan node. +/// +/// Nodes are executed in ascending `id` order. For a Yannakakis plan as +/// emitted by `Geolog.DB.Plan` this is equivalent to a topological sort, +/// since `insertJoin` only references node ids that have already been +/// allocated. A non-monotone id ordering is rejected with +/// [`RunError::UnresolvedDependency`]. +/// +/// # Errors +/// Returns [`RunError::DuplicateNode`] for repeated ids, +/// [`RunError::MissingNode`] for join references to unknown ids, +/// [`RunError::MissingRoot`] if `query.root` isn't present, +/// [`RunError::MissingTable`] if a scan references a table not in the map, +/// or [`RunError::PatternArityMismatch`] if a scan's pattern doesn't match +/// the table's arity. +pub fn execute( + tables: &HashMap, + query: &Query, +) -> Result { + let mut seen_ids: std::collections::HashSet = + std::collections::HashSet::with_capacity(query.nodes.len()); + for node in &query.nodes { + if !seen_ids.insert(node.id) { + return Err(RunError::DuplicateNode(node.id)); + } + } + if !seen_ids.contains(&query.root) { + return Err(RunError::MissingRoot(query.root)); + } + + let mut ordered: Vec<&Node> = query.nodes.iter().collect(); + ordered.sort_by_key(|n| n.id); + + let mut results: HashMap = HashMap::with_capacity(ordered.len()); + for node in ordered { + let computed = match &node.action { + Action::Scan(atom) => { + let table = tables + .get(&atom.table) + .ok_or_else(|| RunError::MissingTable(atom.table.clone()))?; + if atom.columns.len() != table.arity { + return Err(RunError::PatternArityMismatch { + table: atom.table.clone(), + table_arity: table.arity, + pattern_arity: atom.columns.len(), + }); + } + let pattern = AtomPattern { + columns: atom.columns.iter().cloned().map(Term::from).collect(), + }; + scan_atom(table, &pattern) + } + Action::Join(join) => { + let left = require_dep(&results, &seen_ids, node.id, join.left)?; + let right = require_dep(&results, &seen_ids, node.id, join.right)?; + match join.op { + JoinOp::Left => semijoin(left, right), + JoinOp::Right => semijoin(right, left), + JoinOp::Natural => natural_join(left, right), + } + } + }; + results.insert(node.id, computed); + } + + results + .remove(&query.root) + .ok_or(RunError::MissingRoot(query.root)) +} + +fn require_dep<'a>( + results: &'a HashMap, + seen: &std::collections::HashSet, + node: u32, + depends_on: u32, +) -> Result<&'a Relation, RunError> { + if let Some(rel) = results.get(&depends_on) { + Ok(rel) + } else if seen.contains(&depends_on) { + Err(RunError::UnresolvedDependency { node, depends_on }) + } else { + Err(RunError::MissingNode(depends_on)) + } +} + +/// Convenience: parse JSON, build tables from the embedded facts, and +/// execute, returning the root binding relation. Equivalent to +/// `parse_plan` + [`build_tables`] + [`execute`]. +/// +/// # Errors +/// Returns a JSON parse error if the input is malformed, or a [`RunError`] +/// for any later step. +pub fn run_json(json: &str) -> Result { + let plan = parse_plan(json).map_err(RunFromJsonError::Parse)?; + let tables = build_tables(&plan).map_err(RunFromJsonError::Run)?; + let bindings = execute(&tables, &plan.query).map_err(RunFromJsonError::Run)?; + Ok(bindings) +} + +/// Combined error from [`run_json`]. +#[derive(Debug)] +pub enum RunFromJsonError { + Parse(serde_json::Error), + Run(RunError), +} + +impl std::fmt::Display for RunFromJsonError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Parse(err) => write!(f, "parse error: {err}"), + Self::Run(err) => write!(f, "run error: {err}"), + } + } +} + +impl std::error::Error for RunFromJsonError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match self { + Self::Parse(err) => Some(err), + Self::Run(err) => Some(err), + } + } +} diff --git a/crates/plan-runner/src/main.rs b/crates/plan-runner/src/main.rs new file mode 100644 index 0000000..c76b440 --- /dev/null +++ b/crates/plan-runner/src/main.rs @@ -0,0 +1,225 @@ +//! `plan-run` CLI: read a JSON plan from a file (or stdin if `-`), execute +//! it against the chosen backend, and print the resulting binding relation +//! as JSON on stdout. +//! +//! Backends: +//! +//! - `memory` (default): build tables straight from the plan's `facts` +//! block, no `Storage` trait involved. Pure in-memory path. +//! - `memory-storage`: load the same facts through `storage::MemoryStorage` +//! via the `Storage` trait, then materialize tables back out with +//! `scan_as_table` before executing. +//! - `lmdb`, `redb`, `fjall`, `sqlite`: file-backed `Storage` adapters. +//! Each invocation creates a fresh tempdir for the store and drops it on +//! exit; the runner is one-shot, so persistent paths aren't needed. +//! - `geomerge`: CRDT-backed adapter. Constructed in-memory; alpha-status +//! upstream. + +use std::collections::HashMap; +use std::io::{self, Read}; +use std::process::ExitCode; + +use plan_runner::{JsonValue, Plan, build_tables, build_tables_via_storage, execute, parse_plan}; +use storage::MemoryStorage; +use storage::adapters::fjall::FjallStorage; +use storage::adapters::geomerge::{ColumnKind, GeomergeStorage}; +use storage::adapters::lmdb::LmdbStorage; +use storage::adapters::redb::RedbStorage; +use storage::adapters::sqlite::SqliteStorage; +use storage::table::Table; +use storage::value::Value; +use tempfile::TempDir; + +#[derive(Debug, Clone, Copy)] +enum Backend { + Memory, + MemoryStorage, + Lmdb, + Redb, + Fjall, + Sqlite, + Geomerge, +} + +impl Backend { + fn parse(s: &str) -> Option { + match s { + "memory" => Some(Self::Memory), + "memory-storage" => Some(Self::MemoryStorage), + "lmdb" => Some(Self::Lmdb), + "redb" => Some(Self::Redb), + "fjall" => Some(Self::Fjall), + "sqlite" => Some(Self::Sqlite), + "geomerge" => Some(Self::Geomerge), + _ => None, + } + } +} + +const BACKENDS_HELP: &str = "memory|memory-storage|lmdb|redb|fjall|sqlite|geomerge"; + +fn main() -> ExitCode { + let mut backend = Backend::Memory; + let mut input_path: Option = None; + let mut args = std::env::args().skip(1); + while let Some(arg) = args.next() { + match arg.as_str() { + "--backend" => { + let Some(value) = args.next() else { + eprintln!("--backend requires a value ({BACKENDS_HELP})"); + return ExitCode::from(2); + }; + let Some(parsed) = Backend::parse(&value) else { + eprintln!("unknown backend {value:?} (try {BACKENDS_HELP})"); + return ExitCode::from(2); + }; + backend = parsed; + } + other if input_path.is_none() => input_path = Some(other.to_string()), + other => { + eprintln!("unexpected argument: {other}"); + return ExitCode::from(2); + } + } + } + let Some(path) = input_path else { + eprintln!("usage: plan-run [--backend {BACKENDS_HELP}] "); + return ExitCode::from(2); + }; + + let input = match read_input(&path) { + Ok(s) => s, + Err(err) => { + eprintln!("failed to read {path}: {err}"); + return ExitCode::from(1); + } + }; + + let plan = match parse_plan(&input) { + Ok(p) => p, + Err(err) => { + eprintln!("parse error: {err}"); + return ExitCode::from(1); + } + }; + + let tables = match build_tables_for(&plan, backend) { + Ok(t) => t, + Err(err) => { + eprintln!("{err}"); + return ExitCode::from(1); + } + }; + + let relation = match execute(&tables, &plan.query) { + Ok(r) => r, + Err(err) => { + eprintln!("execute error: {err}"); + return ExitCode::from(1); + } + }; + + let payload = serde_json::json!({ + "columns": relation.columns, + "rows": relation + .rows + .iter() + .map(|row| row.iter().map(value_to_json).collect::>()) + .collect::>(), + }); + println!("{payload}"); + ExitCode::SUCCESS +} + +/// Build the input tables for `plan` using `backend`. Path-based adapters +/// allocate a fresh tempdir; it drops at the end of this function, which is +/// safe because `build_tables_via_storage` fully materializes the tables +/// into owned `Vec` before returning. +fn build_tables_for(plan: &Plan, backend: Backend) -> Result, String> { + match backend { + Backend::Memory => build_tables(plan).map_err(|e| format!("build error: {e}")), + Backend::MemoryStorage => { + let mut storage = MemoryStorage::default(); + build_tables_via_storage(plan, &mut storage) + .map_err(|e| format!("build error (memory-storage): {e}")) + } + Backend::Lmdb => { + let dir = TempDir::new().map_err(|e| format!("tempdir: {e}"))?; + let mut storage = LmdbStorage::open(dir.path()) + .map_err(|e| format!("failed to open lmdb backend: {e}"))?; + build_tables_via_storage(plan, &mut storage) + .map_err(|e| format!("build error (lmdb): {e}")) + } + Backend::Redb => { + let dir = TempDir::new().map_err(|e| format!("tempdir: {e}"))?; + let mut storage = RedbStorage::open(dir.path().join("data.redb")) + .map_err(|e| format!("failed to open redb backend: {e}"))?; + build_tables_via_storage(plan, &mut storage) + .map_err(|e| format!("build error (redb): {e}")) + } + Backend::Fjall => { + let dir = TempDir::new().map_err(|e| format!("tempdir: {e}"))?; + let mut storage = FjallStorage::open(dir.path()) + .map_err(|e| format!("failed to open fjall backend: {e}"))?; + build_tables_via_storage(plan, &mut storage) + .map_err(|e| format!("build error (fjall): {e}")) + } + Backend::Sqlite => { + let dir = TempDir::new().map_err(|e| format!("tempdir: {e}"))?; + let mut storage = SqliteStorage::open(dir.path().join("data.sqlite")) + .map_err(|e| format!("failed to open sqlite backend: {e}"))?; + build_tables_via_storage(plan, &mut storage) + .map_err(|e| format!("build error (sqlite): {e}")) + } + Backend::Geomerge => { + let relations = plan + .schema + .iter() + .map(|(name, &arity)| (name.clone(), infer_column_kinds(plan, name, arity))); + let mut storage = GeomergeStorage::with_relations(relations) + .map_err(|e| format!("failed to open geomerge backend: {e}"))?; + build_tables_via_storage(plan, &mut storage) + .map_err(|e| format!("build error (geomerge): {e}")) + } + } +} + +/// Best-effort column type inference for `geomerge`'s synthesized theory. +/// The runner IR carries only arity, so we peek at the first fact row of +/// the relation. Columns without a sample default to `String`, which +/// matches every checked-in fixture (entity identities are encoded as +/// strings by the exporter). +fn infer_column_kinds(plan: &Plan, name: &str, arity: usize) -> Vec { + let mut kinds = vec![ColumnKind::String; arity]; + let Some(rows) = plan.facts.get(name) else { + return kinds; + }; + let Some(first) = rows.first() else { + return kinds; + }; + for (i, cell) in first.iter().take(arity).enumerate() { + kinds[i] = match cell { + JsonValue::Int(_) => ColumnKind::Int, + JsonValue::Str(_) => ColumnKind::String, + }; + } + kinds +} + +fn read_input(path: &str) -> io::Result { + if path == "-" { + let mut buf = String::new(); + io::stdin().read_to_string(&mut buf)?; + Ok(buf) + } else { + std::fs::read_to_string(path) + } +} + +fn value_to_json(value: &Value) -> serde_json::Value { + match value { + Value::Int(n) => serde_json::Value::Number((*n).into()), + Value::Str(s) => serde_json::Value::String(s.clone()), + Value::Id(id) => serde_json::Value::String(id.to_string()), + } +} diff --git a/crates/plan-runner/tests/examples.rs b/crates/plan-runner/tests/examples.rs new file mode 100644 index 0000000..1128431 --- /dev/null +++ b/crates/plan-runner/tests/examples.rs @@ -0,0 +1,77 @@ +//! Walks every JSON fixture under `crates/plan-runner/fixtures/` and +//! verifies it against the `expected_bindings` the exporter lifted from +//! the matching `tools/exporter/examples/*.scenario.json`. A fixture without an oracle +//! is reported as a failure (every checked-in fixture is expected to +//! carry one). + +use std::collections::BTreeMap; +use std::fs; +use std::path::PathBuf; + +use plan_runner::{parse_plan, run_json, verify}; + +fn fixtures_dir() -> PathBuf { + PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("fixtures") +} + +fn collect_fixtures() -> BTreeMap { + let mut out = BTreeMap::new(); + for entry in fs::read_dir(fixtures_dir()).expect("read fixtures/") { + let path = entry.expect("dir entry").path(); + if path.extension().and_then(|e| e.to_str()) != Some("json") { + continue; + } + let name = path + .file_stem() + .and_then(|s| s.to_str()) + .expect("ascii fixture name") + .to_string(); + let contents = fs::read_to_string(&path).expect("read fixture"); + out.insert(name, contents); + } + out +} + +#[test] +fn every_fixture_runs_and_matches_its_oracle() { + let fixtures = collect_fixtures(); + assert!( + !fixtures.is_empty(), + "no fixtures found in {}", + fixtures_dir().display() + ); + + let mut failures: Vec = Vec::new(); + for (name, json) in &fixtures { + let plan = match parse_plan(json) { + Ok(p) => p, + Err(err) => { + failures.push(format!("{name}: parse error: {err}")); + continue; + } + }; + if plan.expected_bindings.is_none() { + failures.push(format!("{name}: fixture has no expected_bindings")); + continue; + } + let relation = match run_json(json) { + Ok(r) => r, + Err(err) => { + failures.push(format!("{name}: run error: {err}")); + continue; + } + }; + match verify(&plan, &relation) { + Ok(true) => {} + Ok(false) => failures.push(format!("{name}: verify returned no-oracle unexpectedly")), + Err(err) => failures.push(format!("{name}: {err}")), + } + } + + assert!( + failures.is_empty(), + "{} fixture(s) failed:\n {}", + failures.len(), + failures.join("\n ") + ); +} diff --git a/crates/plan-runner/tests/storage_roundtrip.rs b/crates/plan-runner/tests/storage_roundtrip.rs new file mode 100644 index 0000000..ba2b37a --- /dev/null +++ b/crates/plan-runner/tests/storage_roundtrip.rs @@ -0,0 +1,52 @@ +//! Cross-checks the two paths [`plan-runner`] exposes for materializing +//! input tables: the pure [`build_tables`] path and the [`Storage`]-routed +//! [`build_tables_via_storage`] path. Same fixture, same plan, must agree +//! row-for-row. +//! +//! This is the visible proof of the layer boundary: any new `Storage` +//! backend (LMDB, fjall, geomerge) keeps this test honest by re-running it +//! with a different `S`. + +use plan_runner::{build_tables, build_tables_via_storage, execute, parse_plan, run_json}; +use storage::MemoryStorage; +use storage::value::Value; + +const FIXTURE: &str = include_str!("../fixtures/three_atom_chain.json"); + +#[test] +fn storage_backed_execution_matches_pure_path() { + let plan = parse_plan(FIXTURE).expect("parse plan"); + + let pure_tables = build_tables(&plan).expect("build_tables"); + let pure = execute(&pure_tables, &plan.query).expect("pure execute"); + + let mut storage = MemoryStorage::default(); + let storage_tables = + build_tables_via_storage(&plan, &mut storage).expect("build_tables_via_storage"); + let via_storage = execute(&storage_tables, &plan.query).expect("storage execute"); + + assert_eq!(pure.columns, via_storage.columns); + // Scan order between MemoryStorage and the direct-from-JSON path isn't + // required to match; compare rows as a multiset. `Value` is not `Ord` + // (it carries `RowId` and `String`), so use a Debug-derived sort key. + assert_eq!(sorted_rows(&pure.rows), sorted_rows(&via_storage.rows)); +} + +#[test] +fn storage_backed_execution_matches_run_json_oracle() { + let plan = parse_plan(FIXTURE).expect("parse plan"); + let oracle = run_json(FIXTURE).expect("run_json"); + + let mut storage = MemoryStorage::default(); + let tables = build_tables_via_storage(&plan, &mut storage).expect("build_tables_via_storage"); + let via_storage = execute(&tables, &plan.query).expect("storage execute"); + + assert_eq!(oracle.columns, via_storage.columns); + assert_eq!(sorted_rows(&oracle.rows), sorted_rows(&via_storage.rows)); +} + +fn sorted_rows(rows: &[Vec]) -> Vec { + let mut keys: Vec = rows.iter().map(|r| format!("{r:?}")).collect(); + keys.sort(); + keys +} diff --git a/crates/query-ops/README.md b/crates/query-ops/README.md index bde49be..35494b3 100644 --- a/crates/query-ops/README.md +++ b/crates/query-ops/README.md @@ -121,7 +121,7 @@ How it works (logically):
- Types + Workflow
diff --git a/crates/query-ops/src/lib.rs b/crates/query-ops/src/lib.rs index f06b4b1..ffee8b2 100644 --- a/crates/query-ops/src/lib.rs +++ b/crates/query-ops/src/lib.rs @@ -2,9 +2,9 @@ //! //! Three operators are in scope: //! -//! - [`atom::scan_atom`] scans a [`Table`](storage::table::Table) under -//! an [`atom::AtomPattern`], filtering for repeated-variable equality and -//! literal equality, and outputs a binding [`relation::Relation`]. +//! - [`atom::scan_atom`] scans a [`Table`] under an [`atom::AtomPattern`], +//! filtering for repeated-variable equality and literal equality, and +//! outputs a binding [`relation::Relation`]. //! - [`join::semijoin`] keeps rows of one relation whose shared-column values //! appear in another. //! - [`join::natural_join`] combines rows that agree on shared columns, @@ -14,10 +14,8 @@ //! is just an expression like //! `natural_join(&semijoin(&a, &b), &scan_atom(&t, &p))`. //! -//! Foundational types [`Value`](storage::value::Value) and -//! [`Table`](storage::table::Table) live in `storage`, the -//! storage-layer crate this crate is built on; storage backends produce -//! `Table`s that operators here consume. +//! `Value` and `Table` live in the `storage` crate; consumers that build +//! inputs depend on `storage` directly. pub mod atom; pub mod join; diff --git a/crates/storage/README.md b/crates/storage/README.md index ae8354d..7f042c7 100644 --- a/crates/storage/README.md +++ b/crates/storage/README.md @@ -24,7 +24,8 @@ This crates helps with decoupling the query execution logic from the underlying | `adapters::redb::RedbStorage` | struct (feat) | Single-file B-tree backed `Storage`, behind the `redb` feature. Wraps `redb::WriteTransaction` for native atomic commits. | | `adapters::fjall::FjallStorage` | struct (feat) | LSM-tree backed `Storage`, behind the `fjall` feature. Each relation gets a partition; transactions buffer inserts and apply them on commit. | | `adapters::lmdb::LmdbStorage` | struct (feat) | mmap'd B-tree backed `Storage`, behind the `lmdb` feature. Wraps `heed`'s `RwTxn` for native atomic commits. | -| `adapters::geomerge::GeomergeStorage` | struct (feat) | CRDT-backed `Storage` over the workspace's `geomerge` crate, behind the `geomerge` feature. Wraps `geomerge::Transaction` and resolves pending row IDs via `CommittedTx`. Deletion is not supported (append-only log). | +| `adapters::geomerge::GeomergeStorage` | struct (feat) | CRDT-backed `Storage` over the workspace's `geomerge` crate, behind the `geomerge` feature. Wraps `geomerge::Transaction` and resolves pending row IDs via `CommittedTx`. Deletion is not supported (append-only log). Construct with `from_theory`, `from_store`, or `with_relations` (synthesizes a theory from `(name, Vec)` for callers that lack a typed schema). | +| `adapters::geomerge::ColumnKind` | enum (feat) | Primitive column type fed to `GeomergeStorage::with_relations`: `Int` maps to geomerge `PrimInt`, `String` maps to `PrimString`. Exists so callers can synthesize a theory without depending on `geolog-lang::ir` directly. | Data types and their relationships: diff --git a/crates/storage/src/adapters/geomerge.rs b/crates/storage/src/adapters/geomerge.rs index 7539105..6109feb 100644 --- a/crates/storage/src/adapters/geomerge.rs +++ b/crates/storage/src/adapters/geomerge.rs @@ -93,6 +93,17 @@ fn decode_pending_row_id(bytes: &[u8]) -> Result { } /// Geomerge-backed [`Storage`] implementation. +/// Primitive column type used by [`GeomergeStorage::with_relations`] to +/// synthesize a theory from an untyped `(name, arity)` schema. Geomerge +/// supports `PrimInt`, `PrimString`, and entity types; only the two +/// primitives are exposed here, since callers using this constructor by +/// definition don't carry entity-target information. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ColumnKind { + Int, + String, +} + pub struct GeomergeStorage { store: Store, declared: HashSet, @@ -138,6 +149,52 @@ impl GeomergeStorage { } } + /// Build a store with a theory synthesized from a flat list of + /// `(relation_name, column_kinds)`. Each `ColumnKind` is mapped to the + /// matching `PrimType`. No entity columns and no laws are declared. + /// + /// This is the convenience constructor for callers (e.g., the + /// `plan-runner` CLI) whose schema only carries arity plus a column-by- + /// column primitive-type guess taken from a sample row. It exists so + /// those callers don't have to depend on `geolog-lang::ir` directly. + /// + /// # Errors + /// Returns [`StorageError::Backend`] if geomerge rejects the synthesized + /// theory. + pub fn with_relations(relations: I) -> Result + where + I: IntoIterator)>, + S: Into, + { + let tables: Vec = relations + .into_iter() + .map(|(name, kinds)| { + let columns = kinds + .into_iter() + .map(|k| ir::ColType::PrimType { + prim: match k { + ColumnKind::Int => ir::PrimType::PrimInt, + ColumnKind::String => ir::PrimType::PrimString, + }, + }) + .collect(); + let name: String = name.into(); + ir::TableEntry { + path: name.into(), + table: ir::Schema { + columns, + primary_key: None, + }, + } + }) + .collect(); + let theory = ir::FlatTheory { + tables, + laws: Vec::new(), + }; + Self::from_theory(theory) + } + /// Borrow the underlying geomerge store (for backend-specific operations /// like persistence, dump, or law inspection that aren't on the trait). #[must_use] diff --git a/crates/storage/src/adapters/lmdb.rs b/crates/storage/src/adapters/lmdb.rs index 5fd8353..dd44267 100644 --- a/crates/storage/src/adapters/lmdb.rs +++ b/crates/storage/src/adapters/lmdb.rs @@ -154,12 +154,12 @@ impl Transaction for LmdbTx<'_> { let Some(wtxn) = self.wtxn.as_ref() else { unreachable!("transaction was already committed") }; - let raw = self + let encoded = self .meta .get(wtxn, name.as_bytes()) .map_err(backend)? .ok_or_else(|| StorageError::RelationNotFound(name.to_string()))?; - let entry = decode_meta(raw)?; + let entry = decode_meta(encoded)?; self.next_ids.insert(name.to_string(), entry); entry }; diff --git a/crates/storage/src/adapters/sqlite.rs b/crates/storage/src/adapters/sqlite.rs index a561000..5646048 100644 --- a/crates/storage/src/adapters/sqlite.rs +++ b/crates/storage/src/adapters/sqlite.rs @@ -1,4 +1,4 @@ -//! SQLite adapter via the `rusqlite` crate (bundled libsqlite3). +//! `SQLite` adapter via the `rusqlite` crate (bundled libsqlite3). //! //! Storage layout: //! @@ -35,13 +35,13 @@ CREATE TABLE IF NOT EXISTS __rows ( ); "; -/// SQLite-backed [`Storage`] implementation. +/// `SQLite`-backed [`Storage`] implementation. pub struct SqliteStorage { conn: Connection, } impl SqliteStorage { - /// Open or create a SQLite database at `path`. Pass `":memory:"` for + /// Open or create a `SQLite` database at `path`. Pass `":memory:"` for /// an in-process database (useful in tests). /// /// # Errors diff --git a/tools/exporter/cabal.project b/tools/exporter/cabal.project index d2d5aa3..35a9fe7 100644 --- a/tools/exporter/cabal.project +++ b/tools/exporter/cabal.project @@ -7,7 +7,7 @@ -- against. packages: - glog-exporter.cabal + plan-exporter.cabal ../../external/geolog/geolog-lang/geolog-lang.cabal ../../external/geolog/data-partition/data-partition.cabal ../../external/geolog/diagnostician/diagnostician.cabal diff --git a/tools/exporter/examples/cartesian.scenario.json b/tools/exporter/examples/cartesian.scenario.json new file mode 100644 index 0000000..3d5d2e3 --- /dev/null +++ b/tools/exporter/examples/cartesian.scenario.json @@ -0,0 +1,31 @@ +{ + "name": "cartesian", + "_description": "Two disjoint atoms over different tables. Exercises the 'stray' branch of planConjunction's spanning forest (no shared variables = no edge in the intersection graph) and the linear chain of natural-joins that fullJoinForest emits over disconnected components.", + "schema": { + "left": { "columns": [{"entity": "left"}] }, + "right": { "columns": [{"entity": "right"}] } + }, + "facts": { + "left": [ + [{"entity": ["left", 1]}], + [{"entity": ["left", 2]}] + ], + "right": [ + [{"entity": ["right", 10]}], + [{"entity": ["right", 20]}] + ] + }, + "atoms": [ + {"table": "left", "values": {"0": {"var": "a"}}}, + {"table": "right", "values": {"0": {"var": "b"}}} + ], + "expected_bindings": { + "columns": ["a", "b"], + "rows": [ + [{"entity": ["left", 1]}, {"entity": ["right", 10]}], + [{"entity": ["left", 1]}, {"entity": ["right", 20]}], + [{"entity": ["left", 2]}, {"entity": ["right", 10]}], + [{"entity": ["left", 2]}, {"entity": ["right", 20]}] + ] + } +} diff --git a/tools/exporter/examples/self_loop.scenario.json b/tools/exporter/examples/self_loop.scenario.json new file mode 100644 index 0000000..8d4b52d --- /dev/null +++ b/tools/exporter/examples/self_loop.scenario.json @@ -0,0 +1,24 @@ +{ + "name": "self-loop", + "_description": "Single-atom query with a repeated variable across two columns: edge(x, x, _). Exercises evalAtom's equality-enforcement path; the planner emits one PlanEvalAtom node and no joins.", + "schema": { + "edge": { "columns": [{"entity": "node"}, {"entity": "node"}, {"entity": "edge"}] } + }, + "facts": { + "edge": [ + [{"entity": ["node", 1]}, {"entity": ["node", 2]}, {"entity": ["edge", 1]}], + [{"entity": ["node", 2]}, {"entity": ["node", 2]}, {"entity": ["edge", 2]}], + [{"entity": ["node", 3]}, {"entity": ["node", 3]}, {"entity": ["edge", 3]}] + ] + }, + "atoms": [ + {"table": "edge", "values": {"0": {"var": "x"}, "1": {"var": "x"}}} + ], + "expected_bindings": { + "columns": ["x"], + "rows": [ + [{"entity": ["node", 2]}], + [{"entity": ["node", 3]}] + ] + } +} diff --git a/tools/exporter/examples/three_atom_chain.scenario.json b/tools/exporter/examples/three_atom_chain.scenario.json new file mode 100644 index 0000000..65143c7 --- /dev/null +++ b/tools/exporter/examples/three_atom_chain.scenario.json @@ -0,0 +1,29 @@ +{ + "name": "three-atom-chain", + "schema": { + "node": { "columns": [{"entity": "node"}] }, + "edge": { "columns": [{"entity": "node"}, {"entity": "node"}, {"entity": "edge"}] } + }, + "facts": { + "node": [ + [{"entity": ["node", 1]}], + [{"entity": ["node", 2]}], + [{"entity": ["node", 3]}] + ], + "edge": [ + [{"entity": ["node", 1]}, {"entity": ["node", 2]}, {"entity": ["edge", 1]}], + [{"entity": ["node", 2]}, {"entity": ["node", 3]}, {"entity": ["edge", 2]}] + ] + }, + "atoms": [ + {"table": "node", "values": {"0": {"var": "a"}}}, + {"table": "edge", "values": {"0": {"var": "a"}, "1": {"var": "b"}}}, + {"table": "edge", "values": {"0": {"var": "b"}, "1": {"var": "c"}}} + ], + "expected_bindings": { + "columns": ["a", "b", "c"], + "rows": [ + [{"entity": ["node", 1]}, {"entity": ["node", 2]}, {"entity": ["node", 3]}] + ] + } +} diff --git a/tools/exporter/examples/two_atom_join.scenario.json b/tools/exporter/examples/two_atom_join.scenario.json new file mode 100644 index 0000000..05f190c --- /dev/null +++ b/tools/exporter/examples/two_atom_join.scenario.json @@ -0,0 +1,28 @@ +{ + "name": "two-atom-join", + "schema": { + "node": { "columns": [{"entity": "node"}] }, + "edge": { "columns": [{"entity": "node"}, {"entity": "node"}, {"entity": "edge"}] } + }, + "facts": { + "node": [ + [{"entity": ["node", 1]}], + [{"entity": ["node", 2]}] + ], + "edge": [ + [{"entity": ["node", 1]}, {"entity": ["node", 2]}, {"entity": ["edge", 1]}], + [{"entity": ["node", 2]}, {"entity": ["node", 1]}, {"entity": ["edge", 2]}] + ] + }, + "atoms": [ + {"table": "node", "values": {"0": {"var": "a"}}}, + {"table": "edge", "values": {"0": {"var": "a"}, "1": {"var": "b"}}} + ], + "expected_bindings": { + "columns": ["a", "b"], + "rows": [ + [{"entity": ["node", 1]}, {"entity": ["node", 2]}], + [{"entity": ["node", 2]}, {"entity": ["node", 1]}] + ] + } +} diff --git a/tools/exporter/glog-exporter.cabal b/tools/exporter/plan-exporter.cabal similarity index 51% rename from tools/exporter/glog-exporter.cabal rename to tools/exporter/plan-exporter.cabal index 8dd2aae..95ae2f3 100644 --- a/tools/exporter/glog-exporter.cabal +++ b/tools/exporter/plan-exporter.cabal @@ -1,19 +1,20 @@ cabal-version: 3.4 -name: glog-exporter +name: plan-exporter version: 0.1.0.0 license: MIT OR Apache-2.0 author: storage-engine-playground -synopsis: Export geolog-lang join plans as JSON for the Rust runner. +synopsis: Export conjunctive-query plans as JSON for the Rust plan-runner. description: - Builds a FlatTheory + facts + a list of QAtoms for a named scenario, - runs Geolog.DB.Plan.planConjunction, and emits a JSON document that - crates/glog-runner consumes. This allows the playground use query-ops and - storage end-to-end with a real Yannakakis plan produced by the geolog - frontend, not a hand-written fixture. + Reads a scenario (FlatTheory + facts + a list of QAtoms) from JSON, + runs Geolog.DB.Plan.planConjunction, and emits a plan IR JSON document + that crates/plan-runner consumes. The IR is the contract between the + Haskell frontend and the Rust executor; this tool is currently the only + producer, but any frontend that emits the same JSON shape can drive the + runner. build-type: Simple -executable glog-export +executable plan-export main-is: Main.hs hs-source-dirs: src default-language: GHC2024 @@ -32,5 +33,6 @@ executable glog-export , base , bytestring , containers + , fnotation , geolog-lang , text diff --git a/tools/exporter/src/Main.hs b/tools/exporter/src/Main.hs index d9d204d..8276f14 100644 --- a/tools/exporter/src/Main.hs +++ b/tools/exporter/src/Main.hs @@ -1,31 +1,41 @@ --- | Exports a geolog-lang join plan as JSON for the Rust runner in --- @crates/glog-runner@. +-- | Reads a @.scenario.json@ example, plans its conjunction with +-- @Geolog.DB.Plan.planConjunction@, and writes a runner-IR JSON plan that +-- @crates\/plan-runner@ consumes. -- -- Invocation: -- -- @ --- cabal run glog-export -- > plan.json +-- cabal run plan-export -- -- @ -- --- Available scenarios: @three-atom-chain@. +-- The scenario format is documented in @examples\/README@ or by example +-- (@examples\/*.scenario.json@); the output shape is documented in +-- @crates\/plan-runner\/src\/lib.rs@. -- --- The output shape is documented in @crates\/glog-runner\/src\/lib.rs@. --- This program is the canonical producer: any change to the IR should --- start here, with the Rust runner updated to match. +-- The exporter is also a self-check: before emitting, it runs the planned +-- query through @evalConjunctionPlanned@ and verifies the bindings match +-- the scenario's @expected_bindings@. A mismatched scenario fails loudly +-- here rather than handing a bad fixture to the Rust runner. module Main (main) where import Algebra.Graph qualified as AG -import Data.Aeson ((.=)) +import Control.Monad (unless) +import Data.Aeson ((.!=), (.:), (.:?), (.=)) import Data.Aeson qualified as Aeson import Data.Aeson.Encode.Pretty qualified as AesonPretty import Data.Aeson.Key qualified as Key +import Data.Aeson.KeyMap qualified as KM +import Data.Aeson.Types (Parser) import Data.ByteString.Lazy.Char8 qualified as LBS8 +import Data.Foldable (toList) import Data.List (sortOn) import Data.Map.Strict (Map) import Data.Map.Strict qualified as Map import Data.Set qualified as Set import Data.Text (Text) import Data.Text qualified as T +import Data.String (fromString) +import FNotation.Names (Name) import Geolog.DB.InMemory import Geolog.DB.Plan import Geolog.IR qualified as IR @@ -33,74 +43,142 @@ import System.Environment (getArgs) import System.Exit (die) import System.IO (hPutStrLn, stderr) --- * Scenario plumbing +-- * Scenario file format -- --- A scenario fixes a schema, a set of ground facts, and a conjunction of --- query atoms. The exporter is intentionally code-driven (not @.glog@ --- driven): @.glog@ files declare theories, not queries, so the query --- side has to live in Haskell either way. +-- Mirrors @Geolog.IR.FlatTheory@ + @[(Path, [Val])]@ + @[QAtom]@. The +-- 'Expected' block is optional but, when present, the exporter cross- +-- checks it against the planner's own evaluation before emitting. data Scenario = Scenario - { scName :: String - , scTheory :: IR.FlatTheory + { scName :: Text + , scSchema :: Map IR.Path SchemaEntry , scFacts :: [(IR.Path, [Val])] , scAtoms :: [QAtom] + , scExpected :: Maybe Expected } + deriving (Show) --- * three-atom-chain --- --- Mirrors @DB.InMemoryTest@ "matches evalConjunction on three-atom chain". --- node = {e1, e2, e3}, edge = {(e1,e2,ee1), (e2,e3,ee2)}. --- Conjunction: node(a), edge(a, b, _), edge(b, c, _). +data SchemaEntry = SchemaEntry + { seColumns :: [IR.ColType] + , sePrimaryKey :: Maybe [Int] + } + deriving (Show) -nodePath, edgePath :: IR.Path -nodePath = ["node"] -edgePath = ["edge"] +data Expected = Expected + { exColumns :: [Text] + , exRows :: [[Val]] + } + deriving (Show) -threeAtomChain :: Scenario -threeAtomChain = - Scenario - { scName = "three-atom-chain" - , scTheory = - IR.FlatTheory - { tables = - Map.fromList - [ (nodePath, IR.Table {columns = [IR.EntityType nodePath], primaryKey = Nothing}) - , (edgePath, IR.Table {columns = [IR.EntityType nodePath, IR.EntityType nodePath, IR.EntityType edgePath], primaryKey = Nothing}) - ] - , laws = Map.empty - } - , scFacts = - [ (nodePath, [ValEntity nodePath 1]) - , (nodePath, [ValEntity nodePath 2]) - , (nodePath, [ValEntity nodePath 3]) - , (edgePath, [ValEntity nodePath 1, ValEntity nodePath 2, ValEntity edgePath 1]) - , (edgePath, [ValEntity nodePath 2, ValEntity nodePath 3, ValEntity edgePath 2]) - ] - , scAtoms = - [ QAtom {qaTable = nodePath, qaRowId = Nothing, qaValues = Map.singleton 0 (QVar (Var "a"))} - , QAtom {qaTable = edgePath, qaRowId = Nothing, qaValues = Map.fromList [(0, QVar (Var "a")), (1, QVar (Var "b"))]} - , QAtom {qaTable = edgePath, qaRowId = Nothing, qaValues = Map.fromList [(0, QVar (Var "b")), (1, QVar (Var "c"))]} - ] +-- ** JSON parsers + +parsePath :: Aeson.Value -> Parser IR.Path +parsePath = Aeson.withText "path" \t -> pure [nameFromText t] + +-- | Build a single-segment 'Name' from text. Multi-segment names (which +-- would carry a non-empty 'init' field) aren't needed by any current +-- example; if a scenario wants @"a/b"@-style paths, extend this helper. +nameFromText :: Text -> Name +nameFromText = fromString . T.unpack + +instance Aeson.FromJSON SchemaEntry where + parseJSON = Aeson.withObject "SchemaEntry" \o -> + SchemaEntry <$> o .: "columns" <*> o .:? "primaryKey" + +instance Aeson.FromJSON IR.ColType where + parseJSON = Aeson.withObject "ColType" \o -> do + case KM.toList o of + [("entity", v)] -> IR.EntityType <$> parsePath v + [("prim", v)] -> IR.PrimType <$> parsePrim v + _ -> fail "ColType: expected {\"entity\": } or {\"prim\": \"int\"|\"string\"}" + +parsePrim :: Aeson.Value -> Parser IR.PrimType +parsePrim = Aeson.withText "prim type" \case + "int" -> pure IR.PrimInt + "string" -> pure IR.PrimString + other -> fail ("unknown primitive type: " <> T.unpack other) + +parseVal :: Aeson.Value -> Parser Val +parseVal = Aeson.withObject "Val" \o -> + case KM.toList o of + [("int", v)] -> ValInt <$> Aeson.parseJSON v + [("str", v)] -> ValText <$> Aeson.parseJSON v + [("entity", v)] -> parseEntity v + _ -> fail "Val: expected {\"int\": ..} | {\"str\": ..} | {\"entity\": [, ]}" + where + parseEntity = Aeson.withArray "entity" \arr -> case toList arr of + [pv, nv] -> do + p <- parsePath pv + n <- Aeson.parseJSON nv + pure (ValEntity p n) + _ -> fail "entity: expected [, ]" + +parseQVal :: Aeson.Value -> Parser QVal +parseQVal = Aeson.withObject "QVal" \o -> + case KM.toList o of + [("var", v)] -> QVar . Var <$> Aeson.parseJSON v + [("lit", v)] -> QLit <$> parseVal v + _ -> fail "QVal: expected {\"var\": \"name\"} or {\"lit\": }" + +parseAtom :: Aeson.Value -> Parser QAtom +parseAtom = Aeson.withObject "QAtom" \o -> do + qaTable <- o .: "table" >>= parsePath + qaRowId <- o .:? "rowId" >>= traverse parseQVal + values <- o .: "values" :: Parser (Map Text Aeson.Value) + qaValues <- + Map.fromList + <$> traverse + ( \(k, v) -> case reads (T.unpack k) of + [(i, "")] -> (i,) <$> parseQVal v + _ -> fail ("non-integer key in atom values: " <> T.unpack k) + ) + (Map.toList values) + pure QAtom {qaTable, qaRowId, qaValues} + +parseExpected :: Aeson.Value -> Parser Expected +parseExpected = Aeson.withObject "Expected" \o -> do + exColumns <- o .: "columns" + rawRows <- o .: "rows" :: Parser [[Aeson.Value]] + exRows <- traverse (traverse parseVal) rawRows + pure Expected {exColumns, exRows} + +instance Aeson.FromJSON Scenario where + parseJSON = Aeson.withObject "Scenario" \o -> do + scName <- o .:? "name" .!= "unnamed" + rawSchema <- o .: "schema" :: Parser (Map Text SchemaEntry) + let scSchema = Map.fromList [([nameFromText k], v) | (k, v) <- Map.toList rawSchema] + rawFacts <- o .:? "facts" .!= mempty :: Parser (Map Text [[Aeson.Value]]) + scFacts <- + concat + <$> traverse + ( \(name, rows) -> do + let path = [nameFromText name] + parsedRows <- traverse (traverse parseVal) rows + pure [(path, row) | row <- parsedRows] + ) + (Map.toList rawFacts) + rawAtoms <- o .: "atoms" :: Parser [Aeson.Value] + scAtoms <- traverse parseAtom rawAtoms + scExpected <- o .:? "expected_bindings" >>= traverse parseExpected + pure Scenario {scName, scSchema, scFacts, scAtoms, scExpected} + +-- * Scenario → FlatTheory + DB + atoms + +toFlatTheory :: Scenario -> IR.FlatTheory +toFlatTheory sc = + IR.FlatTheory + { tables = Map.map (\e -> IR.Table {columns = seColumns e, primaryKey = sePrimaryKey e}) sc.scSchema + , laws = Map.empty } -scenarios :: [Scenario] -scenarios = [threeAtomChain] +populateDB :: Scenario -> DB +populateDB sc = foldl (\d (p, row) -> insertRow p row d) (fromTheory (toFlatTheory sc)) sc.scFacts --- * JSON encoding +-- * JSON encoding for the plan-runner IR -- --- The shape mirrors the IR in @crates/glog-runner/src/lib.rs@: --- --- > { --- > "schema": {: , ...}, --- > "facts": {: [[, ...], ...], ...}, --- > "query": {"root": , "nodes": [{"id": , "action": }, ...]} --- > } +-- The shape is the same one we settled on earlier; see +-- @crates/plan-runner/src/lib.rs@. --- | Render a 'Geolog.IR.Path' (a list of 'FNotation.Names.Name') as a flat --- string for use as a relation name on the Rust side. Each 'Name' is --- already shown with @\/@ between its own init segments and last, so we --- reuse 'show' and join Names with @\/@ too. pathText :: IR.Path -> Text pathText = T.intercalate "/" . map (T.pack . show) @@ -119,10 +197,6 @@ encodeTerm = \case QVar (Var name) -> Aeson.object ["var" .= name] QLit v -> Aeson.object ["lit" .= encodeValue v] --- | Flatten an atom into one term per stored column, mirroring --- @Geolog.DB.InMemory.toFlatArgs@: @qaValues@ keys map to positions --- @0..n-2@, @qaRowId@ (if present) maps to position @n-1@, and any --- missing positions become wildcard variables with locally-unique names. flattenAtom :: Int -> Int -> QAtom -> [Aeson.Value] flattenAtom atomIdx arity qa = [ encodeTerm (Map.findWithDefault (wildcard atomIdx pos) pos merged) @@ -145,9 +219,6 @@ encodeAtom tables atomIdx qa = Just t -> length t.columns Nothing -> error ("encodeAtom: unknown table " <> show qa.qaTable) --- | Stable atom indexing keyed by atom identity, so the wildcard names in --- @flattenAtom@ are deterministic across runs even if the planner's node --- ordering changes. atomIndex :: [QAtom] -> Map QAtom Int atomIndex atoms = Map.fromList (zip (Set.toList (Set.fromList atoms)) [0 ..]) @@ -176,9 +247,6 @@ encodeNode tables idx n = ] ] --- | Render a 'PlanGraph' as the JSON the runner consumes. Empty graphs --- produce @{"root": 0, "nodes": []}@, which the runner treats as a --- well-formed but empty query. encodeQuery :: Map IR.Path IR.Table -> Map QAtom Int -> PlanGraph -> Aeson.Value encodeQuery tables idx (PlanGraph g) | null nodes = @@ -192,24 +260,30 @@ encodeQuery tables idx (PlanGraph g) nodes = sortOn (.graphId.unPlanNodeId) (AG.vertexList g) rootId = case graphRoot (PlanGraph g) of Just (PlanNodeId i) -> i - -- Non-empty graph with no topological root means a cycle, which - -- planConjunction never produces. Fall back to the last id rather - -- than crashing so a bug here is still inspectable. Nothing -> (.graphId.unPlanNodeId) (last nodes) +encodeExpected :: Expected -> Aeson.Value +encodeExpected ex = + Aeson.object + [ "columns" .= exColumns ex + , "rows" .= map (map encodeValue) (exRows ex) + ] + encodePlan :: Scenario -> Aeson.Value encodePlan sc = Aeson.object - [ "_scenario" .= sc.scName - , "schema" .= Aeson.object - [pathKey p .= length t.columns | (p, t) <- Map.toList sc.scTheory.tables] - , "facts" .= Aeson.object - [pathKey p .= map (map encodeValue) rows | (p, rows) <- groupedFacts sc.scFacts] - , "query" .= encodeQuery sc.scTheory.tables (atomIndex sc.scAtoms) (planConjunction sc.scAtoms) - ] + ( [ "_scenario" .= scName sc + , "schema" .= Aeson.object [pathKey p .= length (seColumns t) | (p, t) <- Map.toList sc.scSchema] + , "facts" + .= Aeson.object + [ pathKey p .= map (map encodeValue) rows + | (p, rows) <- groupedFacts sc.scFacts + ] + , "query" .= encodeQuery (toFlatTheory sc).tables (atomIndex sc.scAtoms) (planConjunction sc.scAtoms) + ] + ++ maybe [] (\e -> ["expected_bindings" .= encodeExpected e]) sc.scExpected + ) --- | Group facts by table while preserving table-first-seen order and --- per-table insertion order. groupedFacts :: [(IR.Path, [Val])] -> [(IR.Path, [[Val]])] groupedFacts = go [] where @@ -222,17 +296,45 @@ groupedFacts = go [] -- * Self-check -- --- Run the planner's @evalConjunctionPlanned@ against the scenario's DB --- to confirm the plan we're about to emit is well-formed and produces --- non-error output. Catches malformed scenarios before they hand a bad --- plan to the Rust runner. +-- Cross-check the planned bindings against any user-supplied +-- 'expected_bindings'. Detects two classes of bug before they reach the +-- Rust side: a scenario whose 'expected' is wrong, and a planner output +-- that disagrees with 'evalConjunction'. selfCheck :: Scenario -> IO () selfCheck sc = do - let db = foldl (\d (p, row) -> insertRow p row d) (fromTheory sc.scTheory) sc.scFacts + let db = populateDB sc case evalConjunctionPlanned db sc.scAtoms of - Left err -> die ("self-check failed for " <> sc.scName <> ": " <> show err) - Right _ -> pure () + Left err -> die ("self-check failed for " <> T.unpack sc.scName <> ": " <> show err) + Right actual -> case sc.scExpected of + Nothing -> pure () + Just expected -> verifyAgainstExpected sc.scName expected actual + +verifyAgainstExpected :: Text -> Expected -> Bindings -> IO () +verifyAgainstExpected name expected actual = do + let actualCols = actual.cols + expectedCols = Set.fromList (map Var (exColumns expected)) + unless (Set.isSubsetOf expectedCols actualCols) $ + die $ + "self-check failed for " + <> T.unpack name + <> ": expected_bindings names columns not produced by the plan: " + <> show (Set.difference expectedCols actualCols) + let projectedActual = Set.map (`projectOn` exColumns expected) actual.table + expectedProjected = Set.fromList (map (zip (exColumns expected)) (exRows expected)) + expectedSet = Set.map (Map.fromList . map (\(v, x) -> (Var v, x))) expectedProjected + unless (projectedActual == expectedSet) $ + die $ + "self-check failed for " + <> T.unpack name + <> ":\n expected: " + <> show expectedSet + <> "\n actual: " + <> show projectedActual + +projectOn :: Map Var Val -> [Text] -> Map Var Val +projectOn row keys = + Map.fromList [(Var k, v) | k <- keys, Just v <- [Map.lookup (Var k) row]] -- * Entry point @@ -240,13 +342,13 @@ main :: IO () main = do args <- getArgs case args of - [name] -> case lookup name [(s.scName, s) | s <- scenarios] of - Just sc -> do - selfCheck sc - LBS8.putStrLn (AesonPretty.encodePretty (encodePlan sc)) - Nothing -> - die ("unknown scenario: " <> name <> "\navailable: " <> unwords (map (.scName) scenarios)) + [path] -> do + raw <- LBS8.readFile path + sc <- case Aeson.eitherDecode raw of + Left err -> die ("failed to parse " <> path <> ": " <> err) + Right sc -> pure sc + selfCheck sc + LBS8.putStrLn (AesonPretty.encodePretty (encodePlan sc)) _ -> do - hPutStrLn stderr "usage: glog-export " - hPutStrLn stderr ("scenarios: " <> unwords (map (.scName) scenarios)) + hPutStrLn stderr "usage: plan-export " die ""