Make the demo executions more verbose

This commit is contained in:
Hassan Abedi 2026-06-15 15:32:57 +02:00
parent 115b3ff6f9
commit 7bfd582884
4 changed files with 351 additions and 22 deletions

View File

@ -99,6 +99,35 @@ export-fixtures: ## Regenerate plan JSON for every tools/exporter/examples/*.sce
examples: export-fixtures ## Regenerate fixtures from scenarios and run them through plan-runner against their oracles.
@cargo test -p plan-runner --test examples
DEMO_SCENARIO ?= two_atom_join
.PHONY: demo
demo: ## Run the end-to-end demo: query -> geolog plan -> Geomerge storage -> execution -> verified results
@if [ -z "$(HAS_CARGO)" ]; then \
echo "No Cargo.toml found. Skipping demo."; \
exit 1; \
fi
@echo "=== End-to-end demo: query -> plan -> Geomerge -> results ==="
@echo ""
@python3 -c 'import json, sys; \
s = json.load(open(sys.argv[1])); \
arity = lambda t: len(s["schema"][t]["columns"]); \
term = lambda a, i: a["values"].get(str(i), {}).get("var", "_"); \
atoms = ", ".join(a["table"] + "(" + ", ".join(term(a, i) for i in range(arity(a["table"]))) + ")" for a in s["atoms"]); \
print("query (" + sys.argv[1] + "):"); \
print(" " + atoms)' tools/exporter/examples/$(DEMO_SCENARIO).scenario.json
@echo ""
@echo "The committed plan below was produced from this query by the geolog"
@echo "planner via tools/exporter (make export-fixtures regenerates it)."
@echo ""
@result=$$(cargo run -q -p plan-runner -- --trace --backend geomerge crates/plan-runner/fixtures/$(DEMO_SCENARIO).json); \
echo ""; \
echo "result bindings:"; \
echo "$$result" | python3 -m json.tool --indent 2
@echo ""
@echo "Inspect the plan visually: run 'make viewer' and open"
@echo " http://localhost:$(VIEWER_PORT)/tools/plan-viewer/index.html?fixture=../../crates/plan-runner/fixtures/$(DEMO_SCENARIO).json"
.PHONY: viewer-test
viewer-test: ## Check the plan viewer's JS engine against every fixture oracle (needs Node)
@if ! command -v node >/dev/null 2>&1; then \

View File

@ -56,6 +56,9 @@ pub struct Plan {
/// scenario's `expected_bindings` block into this field.
#[serde(default)]
pub expected_bindings: Option<ExpectedBindings>,
/// Scenario label the exporter writes as `_scenario`. Display only.
#[serde(default, rename = "_scenario")]
pub scenario: Option<String>,
}
/// Expected query result, projected to a named subset of variables. The
@ -367,6 +370,22 @@ pub fn build_tables(plan: &Plan) -> Result<HashMap<String, Table>, RunError> {
Ok(tables)
}
/// One step of [`build_tables_via_storage_observed`], reported in the order
/// it happens. Lets callers narrate storage activity (the `plan-run` CLI's
/// `--trace` flag) without the loader printing anything itself.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum LoadEvent {
/// A relation was declared in the backend.
CreateRelation { relation: String, arity: usize },
/// All fact rows for one relation were inserted into the open
/// transaction.
Insert { relation: String, rows: usize },
/// The fact-loading transaction was committed.
Commit,
/// A relation was scanned back out of the backend into a [`Table`].
ScanTable { relation: String, rows: usize },
}
/// Populate a [`Storage`] backend from a [`Plan`]'s schema and facts, then
/// materialize each declared relation back into an in-memory [`Table`] via
/// [`scan_as_table`]. The returned map is the same shape [`execute`]
@ -383,12 +402,36 @@ pub fn build_tables_via_storage<S: Storage>(
plan: &Plan,
storage: &mut S,
) -> Result<HashMap<String, Table>, RunError> {
for (name, arity) in &plan.schema {
storage.create_relation(name, *arity)?;
build_tables_via_storage_observed(plan, storage, &mut |_| {})
}
/// [`build_tables_via_storage`] with an observer that receives a
/// [`LoadEvent`] for every storage operation. Relations are processed in
/// sorted name order so the event stream is deterministic.
///
/// # Errors
/// Same conditions as [`build_tables_via_storage`].
pub fn build_tables_via_storage_observed<S: Storage>(
plan: &Plan,
storage: &mut S,
observer: &mut dyn FnMut(LoadEvent),
) -> Result<HashMap<String, Table>, RunError> {
let mut names: Vec<&String> = plan.schema.keys().collect();
names.sort();
for name in &names {
let arity = plan.schema[name.as_str()];
storage.create_relation(name, arity)?;
observer(LoadEvent::CreateRelation {
relation: (*name).clone(),
arity,
});
}
{
let mut tx = storage.transaction()?;
for (name, rows) in &plan.facts {
let mut fact_names: Vec<&String> = plan.facts.keys().collect();
fact_names.sort();
for name in fact_names {
let rows = &plan.facts[name.as_str()];
let Some(&arity) = plan.schema.get(name) else {
return Err(RunError::UnknownRelation(name.clone()));
};
@ -403,13 +446,22 @@ pub fn build_tables_via_storage<S: Storage>(
let cells: Vec<Value> = row.iter().cloned().map(Value::from).collect();
tx.insert(name, cells)?;
}
observer(LoadEvent::Insert {
relation: name.clone(),
rows: rows.len(),
});
}
tx.commit()?;
observer(LoadEvent::Commit);
}
let mut tables: HashMap<String, Table> = HashMap::with_capacity(plan.schema.len());
for name in plan.schema.keys() {
for name in &names {
let table = scan_as_table(storage as &dyn Storage, name)?;
tables.insert(name.clone(), table);
observer(LoadEvent::ScanTable {
relation: (*name).clone(),
rows: table.rows.len(),
});
tables.insert((*name).clone(), table);
}
Ok(tables)
}
@ -434,6 +486,22 @@ pub fn execute<S: std::hash::BuildHasher>(
tables: &HashMap<String, Table, S>,
query: &Query,
) -> Result<Relation, RunError> {
let mut results = execute_all(tables, query)?;
results
.remove(&query.root)
.ok_or(RunError::MissingRoot(query.root))
}
/// [`execute`], but returning the relation computed at every plan node, not
/// only the root. Keyed by node id. Useful for tracing and explanation,
/// where the intermediate relations are the point.
///
/// # Errors
/// Same conditions as [`execute`].
pub fn execute_all<S: std::hash::BuildHasher>(
tables: &HashMap<String, Table, S>,
query: &Query,
) -> Result<HashMap<u32, Relation>, RunError> {
let mut seen_ids: std::collections::HashSet<u32> =
std::collections::HashSet::with_capacity(query.nodes.len());
for node in &query.nodes {
@ -480,9 +548,7 @@ pub fn execute<S: std::hash::BuildHasher>(
results.insert(node.id, computed);
}
results
.remove(&query.root)
.ok_or(RunError::MissingRoot(query.root))
Ok(results)
}
fn require_dep<'a>(

View File

@ -2,6 +2,13 @@
//! it against the chosen backend, and print the resulting binding relation
//! as JSON on stdout.
//!
//! With `--trace`, every pipeline stage is narrated on stderr: the plan
//! summary, each storage operation (relation creation, transactional
//! inserts, commit, scan-back), the relation computed at every plan node,
//! and the oracle check against `expected_bindings`. A failed oracle check
//! exits nonzero. Stdout still carries only the result JSON, so traced
//! output remains pipeable.
//!
//! Backends:
//!
//! - `memory` (default): build tables straight from the plan's `facts`
@ -19,7 +26,11 @@ use std::collections::HashMap;
use std::io::{self, Read};
use std::process::ExitCode;
use plan_runner::{JsonValue, Plan, build_tables, build_tables_via_storage, execute, parse_plan};
use plan_runner::{
Action, JsonValue, LoadEvent, Plan, build_tables, build_tables_via_storage_observed,
execute_all, parse_plan, verify,
};
use query_ops::relation::Relation;
use storage::MemoryStorage;
use storage::adapters::fjall::FjallStorage;
use storage::adapters::geomerge::{ColumnKind, GeomergeStorage};
@ -58,8 +69,23 @@ impl Backend {
const BACKENDS_HELP: &str = "memory|memory-storage|lmdb|redb|fjall|sqlite|geomerge";
impl Backend {
fn name(self) -> &'static str {
match self {
Self::Memory => "memory",
Self::MemoryStorage => "memory-storage",
Self::Lmdb => "lmdb",
Self::Redb => "redb",
Self::Fjall => "fjall",
Self::Sqlite => "sqlite",
Self::Geomerge => "geomerge",
}
}
}
fn main() -> ExitCode {
let mut backend = Backend::Memory;
let mut trace = false;
let mut input_path: Option<String> = None;
let mut args = std::env::args().skip(1);
while let Some(arg) = args.next() {
@ -75,6 +101,7 @@ fn main() -> ExitCode {
};
backend = parsed;
}
"--trace" => trace = true,
other if input_path.is_none() => input_path = Some(other.to_string()),
other => {
eprintln!("unexpected argument: {other}");
@ -83,7 +110,7 @@ fn main() -> ExitCode {
}
}
let Some(path) = input_path else {
eprintln!("usage: plan-run [--backend {BACKENDS_HELP}] <plan.json | ->");
eprintln!("usage: plan-run [--backend {BACKENDS_HELP}] [--trace] <plan.json | ->");
return ExitCode::from(2);
};
@ -103,7 +130,11 @@ fn main() -> ExitCode {
}
};
let tables = match build_tables_for(&plan, backend) {
if trace {
trace_plan_header(&plan, backend);
}
let tables = match build_tables_for(&plan, backend, trace) {
Ok(t) => t,
Err(err) => {
eprintln!("{err}");
@ -111,13 +142,47 @@ fn main() -> ExitCode {
}
};
let relation = match execute(&tables, &plan.query) {
if trace {
eprintln!("execute:");
}
let mut results = match execute_all(&tables, &plan.query) {
Ok(r) => r,
Err(err) => {
eprintln!("execute error: {err}");
return ExitCode::from(1);
}
};
if trace {
trace_execution(&plan, &results);
}
let Some(relation) = results.remove(&plan.query.root) else {
eprintln!(
"execute error: plan root id {} matches no node",
plan.query.root
);
return ExitCode::from(1);
};
if trace {
match verify(&plan, &relation) {
Ok(true) => {
let expected = plan.expected_bindings.as_ref().map_or(0, |e| e.rows.len());
let columns = plan
.expected_bindings
.as_ref()
.map(|e| e.columns.join(", "))
.unwrap_or_default();
eprintln!(
"verify: bindings match expected_bindings ({expected} rows over {columns})"
);
}
Ok(false) => eprintln!("verify: no expected_bindings in plan, skipped"),
Err(err) => {
eprintln!("verify: FAILED: {err}");
return ExitCode::from(1);
}
}
}
let payload = serde_json::json!({
"columns": relation.columns,
@ -133,42 +198,59 @@ fn main() -> ExitCode {
/// Build the input tables for `plan` using `backend`. Path-based adapters
/// allocate a fresh tempdir; it drops at the end of this function, which is
/// safe because `build_tables_via_storage` fully materializes the tables
/// into owned `Vec<Value>` before returning.
fn build_tables_for(plan: &Plan, backend: Backend) -> Result<HashMap<String, Table>, String> {
/// safe because `build_tables_via_storage_observed` fully materializes the
/// tables into owned `Vec<Value>` before returning.
fn build_tables_for(
plan: &Plan,
backend: Backend,
trace: bool,
) -> Result<HashMap<String, Table>, String> {
let mut observer = |event: LoadEvent| {
if trace {
trace_load_event(&event);
}
};
if trace && !matches!(backend, Backend::Memory) {
eprintln!("storage:");
}
match backend {
Backend::Memory => build_tables(plan).map_err(|e| format!("build error: {e}")),
Backend::Memory => {
if trace {
eprintln!("storage: none (facts loaded directly into in-memory tables)");
}
build_tables(plan).map_err(|e| format!("build error: {e}"))
}
Backend::MemoryStorage => {
let mut storage = MemoryStorage::default();
build_tables_via_storage(plan, &mut storage)
build_tables_via_storage_observed(plan, &mut storage, &mut observer)
.map_err(|e| format!("build error (memory-storage): {e}"))
}
Backend::Lmdb => {
let dir = TempDir::new().map_err(|e| format!("tempdir: {e}"))?;
let mut storage = LmdbStorage::open(dir.path())
.map_err(|e| format!("failed to open lmdb backend: {e}"))?;
build_tables_via_storage(plan, &mut storage)
build_tables_via_storage_observed(plan, &mut storage, &mut observer)
.map_err(|e| format!("build error (lmdb): {e}"))
}
Backend::Redb => {
let dir = TempDir::new().map_err(|e| format!("tempdir: {e}"))?;
let mut storage = RedbStorage::open(dir.path().join("data.redb"))
.map_err(|e| format!("failed to open redb backend: {e}"))?;
build_tables_via_storage(plan, &mut storage)
build_tables_via_storage_observed(plan, &mut storage, &mut observer)
.map_err(|e| format!("build error (redb): {e}"))
}
Backend::Fjall => {
let dir = TempDir::new().map_err(|e| format!("tempdir: {e}"))?;
let mut storage = FjallStorage::open(dir.path())
.map_err(|e| format!("failed to open fjall backend: {e}"))?;
build_tables_via_storage(plan, &mut storage)
build_tables_via_storage_observed(plan, &mut storage, &mut observer)
.map_err(|e| format!("build error (fjall): {e}"))
}
Backend::Sqlite => {
let dir = TempDir::new().map_err(|e| format!("tempdir: {e}"))?;
let mut storage = SqliteStorage::open(dir.path().join("data.sqlite"))
.map_err(|e| format!("failed to open sqlite backend: {e}"))?;
build_tables_via_storage(plan, &mut storage)
build_tables_via_storage_observed(plan, &mut storage, &mut observer)
.map_err(|e| format!("build error (sqlite): {e}"))
}
Backend::Geomerge => {
@ -178,12 +260,89 @@ fn build_tables_for(plan: &Plan, backend: Backend) -> Result<HashMap<String, Tab
.map(|(name, &arity)| (name.clone(), infer_column_kinds(plan, name, arity)));
let mut storage = GeomergeStorage::with_relations(relations)
.map_err(|e| format!("failed to open geomerge backend: {e}"))?;
build_tables_via_storage(plan, &mut storage)
build_tables_via_storage_observed(plan, &mut storage, &mut observer)
.map_err(|e| format!("build error (geomerge): {e}"))
}
}
}
fn trace_plan_header(plan: &Plan, backend: Backend) {
if let Some(name) = &plan.scenario {
eprintln!("scenario: {name}");
}
eprintln!(
"plan: {} nodes, root #{}",
plan.query.nodes.len(),
plan.query.root
);
let mut schema: Vec<String> = plan
.schema
.iter()
.map(|(name, arity)| format!("{name}/{arity}"))
.collect();
schema.sort();
eprintln!("schema: {}", schema.join(", "));
eprintln!("backend: {}", backend.name());
}
fn trace_load_event(event: &LoadEvent) {
match event {
LoadEvent::CreateRelation { relation, arity } => {
eprintln!(" create_relation {relation}/{arity}");
}
LoadEvent::Insert { relation, rows } => {
eprintln!(" tx insert {relation}: {rows} rows");
}
LoadEvent::Commit => eprintln!(" tx commit"),
LoadEvent::ScanTable { relation, rows } => {
eprintln!(" scan_as_table {relation} -> {rows} rows");
}
}
}
/// Per-node execution trace, in ascending id order (the execution order).
fn trace_execution(plan: &Plan, results: &HashMap<u32, Relation>) {
let mut nodes: Vec<_> = plan.query.nodes.iter().collect();
nodes.sort_by_key(|n| n.id);
for node in nodes {
let Some(relation) = results.get(&node.id) else {
continue;
};
let description = match &node.action {
Action::Scan(atom) => format!("scan {}", atom.table),
Action::Join(join) => {
let shared = match (results.get(&join.left), results.get(&join.right)) {
(Some(left), Some(right)) => {
let names: Vec<&str> = left
.columns
.iter()
.filter(|c| right.columns.contains(c))
.map(String::as_str)
.collect();
if names.is_empty() {
"no shared columns (cartesian)".to_string()
} else {
format!("on {}", names.join(", "))
}
}
_ => String::new(),
};
format!("{:?} join #{} #{} {shared}", join.op, join.left, join.right).to_lowercase()
}
};
let root = if node.id == plan.query.root {
" (root)"
} else {
""
};
eprintln!(
" #{} {description} -> {} rows{root}",
node.id,
relation.rows.len()
);
}
}
/// Best-effort column type inference for `geomerge`'s synthesized theory.
/// The runner IR carries only arity, so we peek at the first fact row of
/// the relation. Columns without a sample default to `String`, which

View File

@ -0,0 +1,75 @@
//! Tests for `execute_all` and the observed storage loader, the two pieces
//! behind the `plan-run --trace` narration.
use std::fs;
use std::path::PathBuf;
use plan_runner::{
LoadEvent, build_tables, build_tables_via_storage_observed, execute, execute_all, parse_plan,
};
use storage::MemoryStorage;
fn fixture(name: &str) -> String {
let path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("fixtures")
.join(name);
fs::read_to_string(path).expect("read fixture")
}
#[test]
fn execute_all_covers_every_node_and_agrees_with_execute_on_the_root() {
let plan = parse_plan(&fixture("two_atom_join.json")).expect("parse fixture");
let tables = build_tables(&plan).expect("build tables");
let all = execute_all(&tables, &plan.query).expect("execute_all");
assert_eq!(all.len(), plan.query.nodes.len());
for node in &plan.query.nodes {
assert!(all.contains_key(&node.id), "missing node {}", node.id);
}
let root = execute(&tables, &plan.query).expect("execute");
assert_eq!(all[&plan.query.root].columns, root.columns);
assert_eq!(all[&plan.query.root].rows, root.rows);
}
#[test]
fn observed_loader_reports_storage_events_in_sorted_relation_order() {
let plan = parse_plan(&fixture("two_atom_join.json")).expect("parse fixture");
let mut storage = MemoryStorage::default();
let mut events = Vec::new();
let tables = build_tables_via_storage_observed(&plan, &mut storage, &mut |e| events.push(e))
.expect("load via storage");
assert_eq!(tables.len(), 2);
let expected = vec![
LoadEvent::CreateRelation {
relation: "edge".into(),
arity: 3,
},
LoadEvent::CreateRelation {
relation: "node".into(),
arity: 1,
},
LoadEvent::Insert {
relation: "edge".into(),
rows: 2,
},
LoadEvent::Insert {
relation: "node".into(),
rows: 2,
},
LoadEvent::Commit,
LoadEvent::ScanTable {
relation: "edge".into(),
rows: 2,
},
LoadEvent::ScanTable {
relation: "node".into(),
rows: 2,
},
];
assert_eq!(events, expected);
}
#[test]
fn scenario_label_round_trips_from_the_fixture() {
let plan = parse_plan(&fixture("two_atom_join.json")).expect("parse fixture");
assert_eq!(plan.scenario.as_deref(), Some("two-atom-join"));
}