From 7bfd582884a4d56839e6b82842a9b229c244a9f4 Mon Sep 17 00:00:00 2001 From: Hassan Abedi Date: Mon, 15 Jun 2026 15:32:57 +0200 Subject: [PATCH] Make the demo executions more verbose --- Makefile | 29 +++++ crates/plan-runner/src/lib.rs | 82 +++++++++++-- crates/plan-runner/src/main.rs | 187 +++++++++++++++++++++++++++--- crates/plan-runner/tests/trace.rs | 75 ++++++++++++ 4 files changed, 351 insertions(+), 22 deletions(-) create mode 100644 crates/plan-runner/tests/trace.rs diff --git a/Makefile b/Makefile index 702467c..98774a5 100644 --- a/Makefile +++ b/Makefile @@ -99,6 +99,35 @@ export-fixtures: ## Regenerate plan JSON for every tools/exporter/examples/*.sce examples: export-fixtures ## Regenerate fixtures from scenarios and run them through plan-runner against their oracles. @cargo test -p plan-runner --test examples +DEMO_SCENARIO ?= two_atom_join + +.PHONY: demo +demo: ## Run the end-to-end demo: query -> geolog plan -> Geomerge storage -> execution -> verified results + @if [ -z "$(HAS_CARGO)" ]; then \ + echo "No Cargo.toml found. Skipping demo."; \ + exit 1; \ + fi + @echo "=== End-to-end demo: query -> plan -> Geomerge -> results ===" + @echo "" + @python3 -c 'import json, sys; \ +s = json.load(open(sys.argv[1])); \ +arity = lambda t: len(s["schema"][t]["columns"]); \ +term = lambda a, i: a["values"].get(str(i), {}).get("var", "_"); \ +atoms = ", ".join(a["table"] + "(" + ", ".join(term(a, i) for i in range(arity(a["table"]))) + ")" for a in s["atoms"]); \ +print("query (" + sys.argv[1] + "):"); \ +print(" " + atoms)' tools/exporter/examples/$(DEMO_SCENARIO).scenario.json + @echo "" + @echo "The committed plan below was produced from this query by the geolog" + @echo "planner via tools/exporter (make export-fixtures regenerates it)." + @echo "" + @result=$$(cargo run -q -p plan-runner -- --trace --backend geomerge crates/plan-runner/fixtures/$(DEMO_SCENARIO).json); \ + echo ""; \ + echo "result bindings:"; \ + echo "$$result" | python3 -m json.tool --indent 2 + @echo "" + @echo "Inspect the plan visually: run 'make viewer' and open" + @echo " http://localhost:$(VIEWER_PORT)/tools/plan-viewer/index.html?fixture=../../crates/plan-runner/fixtures/$(DEMO_SCENARIO).json" + .PHONY: viewer-test viewer-test: ## Check the plan viewer's JS engine against every fixture oracle (needs Node) @if ! command -v node >/dev/null 2>&1; then \ diff --git a/crates/plan-runner/src/lib.rs b/crates/plan-runner/src/lib.rs index 7cdcf28..6c87635 100644 --- a/crates/plan-runner/src/lib.rs +++ b/crates/plan-runner/src/lib.rs @@ -56,6 +56,9 @@ pub struct Plan { /// scenario's `expected_bindings` block into this field. #[serde(default)] pub expected_bindings: Option, + /// Scenario label the exporter writes as `_scenario`. Display only. + #[serde(default, rename = "_scenario")] + pub scenario: Option, } /// Expected query result, projected to a named subset of variables. The @@ -367,6 +370,22 @@ pub fn build_tables(plan: &Plan) -> Result, RunError> { Ok(tables) } +/// One step of [`build_tables_via_storage_observed`], reported in the order +/// it happens. Lets callers narrate storage activity (the `plan-run` CLI's +/// `--trace` flag) without the loader printing anything itself. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum LoadEvent { + /// A relation was declared in the backend. + CreateRelation { relation: String, arity: usize }, + /// All fact rows for one relation were inserted into the open + /// transaction. + Insert { relation: String, rows: usize }, + /// The fact-loading transaction was committed. + Commit, + /// A relation was scanned back out of the backend into a [`Table`]. + ScanTable { relation: String, rows: usize }, +} + /// Populate a [`Storage`] backend from a [`Plan`]'s schema and facts, then /// materialize each declared relation back into an in-memory [`Table`] via /// [`scan_as_table`]. The returned map is the same shape [`execute`] @@ -383,12 +402,36 @@ pub fn build_tables_via_storage( plan: &Plan, storage: &mut S, ) -> Result, RunError> { - for (name, arity) in &plan.schema { - storage.create_relation(name, *arity)?; + build_tables_via_storage_observed(plan, storage, &mut |_| {}) +} + +/// [`build_tables_via_storage`] with an observer that receives a +/// [`LoadEvent`] for every storage operation. Relations are processed in +/// sorted name order so the event stream is deterministic. +/// +/// # Errors +/// Same conditions as [`build_tables_via_storage`]. +pub fn build_tables_via_storage_observed( + plan: &Plan, + storage: &mut S, + observer: &mut dyn FnMut(LoadEvent), +) -> Result, RunError> { + let mut names: Vec<&String> = plan.schema.keys().collect(); + names.sort(); + for name in &names { + let arity = plan.schema[name.as_str()]; + storage.create_relation(name, arity)?; + observer(LoadEvent::CreateRelation { + relation: (*name).clone(), + arity, + }); } { let mut tx = storage.transaction()?; - for (name, rows) in &plan.facts { + let mut fact_names: Vec<&String> = plan.facts.keys().collect(); + fact_names.sort(); + for name in fact_names { + let rows = &plan.facts[name.as_str()]; let Some(&arity) = plan.schema.get(name) else { return Err(RunError::UnknownRelation(name.clone())); }; @@ -403,13 +446,22 @@ pub fn build_tables_via_storage( let cells: Vec = row.iter().cloned().map(Value::from).collect(); tx.insert(name, cells)?; } + observer(LoadEvent::Insert { + relation: name.clone(), + rows: rows.len(), + }); } tx.commit()?; + observer(LoadEvent::Commit); } let mut tables: HashMap = HashMap::with_capacity(plan.schema.len()); - for name in plan.schema.keys() { + for name in &names { let table = scan_as_table(storage as &dyn Storage, name)?; - tables.insert(name.clone(), table); + observer(LoadEvent::ScanTable { + relation: (*name).clone(), + rows: table.rows.len(), + }); + tables.insert((*name).clone(), table); } Ok(tables) } @@ -434,6 +486,22 @@ pub fn execute( tables: &HashMap, query: &Query, ) -> Result { + let mut results = execute_all(tables, query)?; + results + .remove(&query.root) + .ok_or(RunError::MissingRoot(query.root)) +} + +/// [`execute`], but returning the relation computed at every plan node, not +/// only the root. Keyed by node id. Useful for tracing and explanation, +/// where the intermediate relations are the point. +/// +/// # Errors +/// Same conditions as [`execute`]. +pub fn execute_all( + tables: &HashMap, + query: &Query, +) -> Result, RunError> { let mut seen_ids: std::collections::HashSet = std::collections::HashSet::with_capacity(query.nodes.len()); for node in &query.nodes { @@ -480,9 +548,7 @@ pub fn execute( results.insert(node.id, computed); } - results - .remove(&query.root) - .ok_or(RunError::MissingRoot(query.root)) + Ok(results) } fn require_dep<'a>( diff --git a/crates/plan-runner/src/main.rs b/crates/plan-runner/src/main.rs index c76b440..ec010de 100644 --- a/crates/plan-runner/src/main.rs +++ b/crates/plan-runner/src/main.rs @@ -2,6 +2,13 @@ //! it against the chosen backend, and print the resulting binding relation //! as JSON on stdout. //! +//! With `--trace`, every pipeline stage is narrated on stderr: the plan +//! summary, each storage operation (relation creation, transactional +//! inserts, commit, scan-back), the relation computed at every plan node, +//! and the oracle check against `expected_bindings`. A failed oracle check +//! exits nonzero. Stdout still carries only the result JSON, so traced +//! output remains pipeable. +//! //! Backends: //! //! - `memory` (default): build tables straight from the plan's `facts` @@ -19,7 +26,11 @@ use std::collections::HashMap; use std::io::{self, Read}; use std::process::ExitCode; -use plan_runner::{JsonValue, Plan, build_tables, build_tables_via_storage, execute, parse_plan}; +use plan_runner::{ + Action, JsonValue, LoadEvent, Plan, build_tables, build_tables_via_storage_observed, + execute_all, parse_plan, verify, +}; +use query_ops::relation::Relation; use storage::MemoryStorage; use storage::adapters::fjall::FjallStorage; use storage::adapters::geomerge::{ColumnKind, GeomergeStorage}; @@ -58,8 +69,23 @@ impl Backend { const BACKENDS_HELP: &str = "memory|memory-storage|lmdb|redb|fjall|sqlite|geomerge"; +impl Backend { + fn name(self) -> &'static str { + match self { + Self::Memory => "memory", + Self::MemoryStorage => "memory-storage", + Self::Lmdb => "lmdb", + Self::Redb => "redb", + Self::Fjall => "fjall", + Self::Sqlite => "sqlite", + Self::Geomerge => "geomerge", + } + } +} + fn main() -> ExitCode { let mut backend = Backend::Memory; + let mut trace = false; let mut input_path: Option = None; let mut args = std::env::args().skip(1); while let Some(arg) = args.next() { @@ -75,6 +101,7 @@ fn main() -> ExitCode { }; backend = parsed; } + "--trace" => trace = true, other if input_path.is_none() => input_path = Some(other.to_string()), other => { eprintln!("unexpected argument: {other}"); @@ -83,7 +110,7 @@ fn main() -> ExitCode { } } let Some(path) = input_path else { - eprintln!("usage: plan-run [--backend {BACKENDS_HELP}] "); + eprintln!("usage: plan-run [--backend {BACKENDS_HELP}] [--trace] "); return ExitCode::from(2); }; @@ -103,7 +130,11 @@ fn main() -> ExitCode { } }; - let tables = match build_tables_for(&plan, backend) { + if trace { + trace_plan_header(&plan, backend); + } + + let tables = match build_tables_for(&plan, backend, trace) { Ok(t) => t, Err(err) => { eprintln!("{err}"); @@ -111,13 +142,47 @@ fn main() -> ExitCode { } }; - let relation = match execute(&tables, &plan.query) { + if trace { + eprintln!("execute:"); + } + let mut results = match execute_all(&tables, &plan.query) { Ok(r) => r, Err(err) => { eprintln!("execute error: {err}"); return ExitCode::from(1); } }; + if trace { + trace_execution(&plan, &results); + } + let Some(relation) = results.remove(&plan.query.root) else { + eprintln!( + "execute error: plan root id {} matches no node", + plan.query.root + ); + return ExitCode::from(1); + }; + + if trace { + match verify(&plan, &relation) { + Ok(true) => { + let expected = plan.expected_bindings.as_ref().map_or(0, |e| e.rows.len()); + let columns = plan + .expected_bindings + .as_ref() + .map(|e| e.columns.join(", ")) + .unwrap_or_default(); + eprintln!( + "verify: bindings match expected_bindings ({expected} rows over {columns})" + ); + } + Ok(false) => eprintln!("verify: no expected_bindings in plan, skipped"), + Err(err) => { + eprintln!("verify: FAILED: {err}"); + return ExitCode::from(1); + } + } + } let payload = serde_json::json!({ "columns": relation.columns, @@ -133,42 +198,59 @@ fn main() -> ExitCode { /// Build the input tables for `plan` using `backend`. Path-based adapters /// allocate a fresh tempdir; it drops at the end of this function, which is -/// safe because `build_tables_via_storage` fully materializes the tables -/// into owned `Vec` before returning. -fn build_tables_for(plan: &Plan, backend: Backend) -> Result, String> { +/// safe because `build_tables_via_storage_observed` fully materializes the +/// tables into owned `Vec` before returning. +fn build_tables_for( + plan: &Plan, + backend: Backend, + trace: bool, +) -> Result, String> { + let mut observer = |event: LoadEvent| { + if trace { + trace_load_event(&event); + } + }; + if trace && !matches!(backend, Backend::Memory) { + eprintln!("storage:"); + } match backend { - Backend::Memory => build_tables(plan).map_err(|e| format!("build error: {e}")), + Backend::Memory => { + if trace { + eprintln!("storage: none (facts loaded directly into in-memory tables)"); + } + build_tables(plan).map_err(|e| format!("build error: {e}")) + } Backend::MemoryStorage => { let mut storage = MemoryStorage::default(); - build_tables_via_storage(plan, &mut storage) + build_tables_via_storage_observed(plan, &mut storage, &mut observer) .map_err(|e| format!("build error (memory-storage): {e}")) } Backend::Lmdb => { let dir = TempDir::new().map_err(|e| format!("tempdir: {e}"))?; let mut storage = LmdbStorage::open(dir.path()) .map_err(|e| format!("failed to open lmdb backend: {e}"))?; - build_tables_via_storage(plan, &mut storage) + build_tables_via_storage_observed(plan, &mut storage, &mut observer) .map_err(|e| format!("build error (lmdb): {e}")) } Backend::Redb => { let dir = TempDir::new().map_err(|e| format!("tempdir: {e}"))?; let mut storage = RedbStorage::open(dir.path().join("data.redb")) .map_err(|e| format!("failed to open redb backend: {e}"))?; - build_tables_via_storage(plan, &mut storage) + build_tables_via_storage_observed(plan, &mut storage, &mut observer) .map_err(|e| format!("build error (redb): {e}")) } Backend::Fjall => { let dir = TempDir::new().map_err(|e| format!("tempdir: {e}"))?; let mut storage = FjallStorage::open(dir.path()) .map_err(|e| format!("failed to open fjall backend: {e}"))?; - build_tables_via_storage(plan, &mut storage) + build_tables_via_storage_observed(plan, &mut storage, &mut observer) .map_err(|e| format!("build error (fjall): {e}")) } Backend::Sqlite => { let dir = TempDir::new().map_err(|e| format!("tempdir: {e}"))?; let mut storage = SqliteStorage::open(dir.path().join("data.sqlite")) .map_err(|e| format!("failed to open sqlite backend: {e}"))?; - build_tables_via_storage(plan, &mut storage) + build_tables_via_storage_observed(plan, &mut storage, &mut observer) .map_err(|e| format!("build error (sqlite): {e}")) } Backend::Geomerge => { @@ -178,12 +260,89 @@ fn build_tables_for(plan: &Plan, backend: Backend) -> Result = plan + .schema + .iter() + .map(|(name, arity)| format!("{name}/{arity}")) + .collect(); + schema.sort(); + eprintln!("schema: {}", schema.join(", ")); + eprintln!("backend: {}", backend.name()); +} + +fn trace_load_event(event: &LoadEvent) { + match event { + LoadEvent::CreateRelation { relation, arity } => { + eprintln!(" create_relation {relation}/{arity}"); + } + LoadEvent::Insert { relation, rows } => { + eprintln!(" tx insert {relation}: {rows} rows"); + } + LoadEvent::Commit => eprintln!(" tx commit"), + LoadEvent::ScanTable { relation, rows } => { + eprintln!(" scan_as_table {relation} -> {rows} rows"); + } + } +} + +/// Per-node execution trace, in ascending id order (the execution order). +fn trace_execution(plan: &Plan, results: &HashMap) { + let mut nodes: Vec<_> = plan.query.nodes.iter().collect(); + nodes.sort_by_key(|n| n.id); + for node in nodes { + let Some(relation) = results.get(&node.id) else { + continue; + }; + let description = match &node.action { + Action::Scan(atom) => format!("scan {}", atom.table), + Action::Join(join) => { + let shared = match (results.get(&join.left), results.get(&join.right)) { + (Some(left), Some(right)) => { + let names: Vec<&str> = left + .columns + .iter() + .filter(|c| right.columns.contains(c)) + .map(String::as_str) + .collect(); + if names.is_empty() { + "no shared columns (cartesian)".to_string() + } else { + format!("on {}", names.join(", ")) + } + } + _ => String::new(), + }; + format!("{:?} join #{} #{} {shared}", join.op, join.left, join.right).to_lowercase() + } + }; + let root = if node.id == plan.query.root { + " (root)" + } else { + "" + }; + eprintln!( + " #{} {description} -> {} rows{root}", + node.id, + relation.rows.len() + ); + } +} + /// Best-effort column type inference for `geomerge`'s synthesized theory. /// The runner IR carries only arity, so we peek at the first fact row of /// the relation. Columns without a sample default to `String`, which diff --git a/crates/plan-runner/tests/trace.rs b/crates/plan-runner/tests/trace.rs new file mode 100644 index 0000000..273c49d --- /dev/null +++ b/crates/plan-runner/tests/trace.rs @@ -0,0 +1,75 @@ +//! Tests for `execute_all` and the observed storage loader, the two pieces +//! behind the `plan-run --trace` narration. + +use std::fs; +use std::path::PathBuf; + +use plan_runner::{ + LoadEvent, build_tables, build_tables_via_storage_observed, execute, execute_all, parse_plan, +}; +use storage::MemoryStorage; + +fn fixture(name: &str) -> String { + let path = PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("fixtures") + .join(name); + fs::read_to_string(path).expect("read fixture") +} + +#[test] +fn execute_all_covers_every_node_and_agrees_with_execute_on_the_root() { + let plan = parse_plan(&fixture("two_atom_join.json")).expect("parse fixture"); + let tables = build_tables(&plan).expect("build tables"); + let all = execute_all(&tables, &plan.query).expect("execute_all"); + assert_eq!(all.len(), plan.query.nodes.len()); + for node in &plan.query.nodes { + assert!(all.contains_key(&node.id), "missing node {}", node.id); + } + let root = execute(&tables, &plan.query).expect("execute"); + assert_eq!(all[&plan.query.root].columns, root.columns); + assert_eq!(all[&plan.query.root].rows, root.rows); +} + +#[test] +fn observed_loader_reports_storage_events_in_sorted_relation_order() { + let plan = parse_plan(&fixture("two_atom_join.json")).expect("parse fixture"); + let mut storage = MemoryStorage::default(); + let mut events = Vec::new(); + let tables = build_tables_via_storage_observed(&plan, &mut storage, &mut |e| events.push(e)) + .expect("load via storage"); + assert_eq!(tables.len(), 2); + let expected = vec![ + LoadEvent::CreateRelation { + relation: "edge".into(), + arity: 3, + }, + LoadEvent::CreateRelation { + relation: "node".into(), + arity: 1, + }, + LoadEvent::Insert { + relation: "edge".into(), + rows: 2, + }, + LoadEvent::Insert { + relation: "node".into(), + rows: 2, + }, + LoadEvent::Commit, + LoadEvent::ScanTable { + relation: "edge".into(), + rows: 2, + }, + LoadEvent::ScanTable { + relation: "node".into(), + rows: 2, + }, + ]; + assert_eq!(events, expected); +} + +#[test] +fn scenario_label_round_trips_from_the_fixture() { + let plan = parse_plan(&fixture("two_atom_join.json")).expect("parse fixture"); + assert_eq!(plan.scenario.as_deref(), Some("two-atom-join")); +}