2026-06-05 13:40:16 +02:00

541 lines
20 KiB
Rust

//! Snapshot executor for conjunctive-query plans.
//!
//! Takes a structural plan (a DAG of `Scan` and `Join` nodes), the input
//! tables it scans, and walks the DAG via [`query_ops::atom::scan_atom`],
//! [`query_ops::join::semijoin`], and [`query_ops::join::natural_join`].
//! The result is a binding [`Relation`](query_ops::relation::Relation) over
//! the query's variables.
//!
//! The runner is intentionally backend-agnostic: it depends only on
//! `query-ops`, and the planner that emits the JSON IR is decoupled from
//! the storage backend that produced the facts. To execute a plan against
//! a [`Storage`](storage::Storage) backend, materialize each input table
//! with [`storage::scan_as_table`] and call [`execute`] with the resulting
//! map. The in-tree `tests/storage_roundtrip.rs` is the canonical example.
//!
//! The JSON IR mirrors `Geolog.DB.Plan.PlanGraph` and
//! `Geolog.DB.InMemory.QAtom` from the `external/geolog` submodule, but the
//! shape is the contract: any frontend that emits this JSON can use the
//! runner.
//!
//! Operator mapping from the Haskell planner:
//!
//! | `Geolog.DB.Plan` | this crate |
//! |-----------------------------|-----------------------------------------------|
//! | `PlanEvalAtom` | [`Action::Scan`] → `scan_atom` |
//! | `PlanJoin LeftJoin a b` | [`Action::Join`] with [`JoinOp::Left`] → `semijoin(rel[a], rel[b])` |
//! | `PlanJoin RightJoin a b` | [`Action::Join`] with [`JoinOp::Right`] → `semijoin(rel[b], rel[a])` |
//! | `PlanJoin NaturalJoin a b` | [`Action::Join`] with [`JoinOp::Natural`] → `natural_join(rel[a], rel[b])` |
//!
//! The atom side covers `evalAtom` (`Geolog.DB.InMemory`): a [`Term::Var`]
//! repeated across positions enforces equality, [`Term::Lit`] filters by
//! constant, and distinct variables project in first-occurrence order.
use std::collections::HashMap;
use serde::Deserialize;
use query_ops::atom::{AtomPattern, Term, scan_atom};
use query_ops::join::{natural_join, semijoin};
use query_ops::relation::Relation;
use storage::table::Table;
use storage::value::Value;
use storage::{Storage, StorageError, scan_as_table};
/// A single fixture: schema, ground facts, and a query plan to execute.
#[derive(Debug, Clone, Deserialize)]
pub struct Plan {
/// Relation name → arity (column count).
pub schema: HashMap<String, usize>,
/// Relation name → list of ground tuples to insert before execution.
pub facts: HashMap<String, Vec<Vec<JsonValue>>>,
/// The join DAG itself.
pub query: Query,
/// Optional oracle: if present, [`verify`] cross-checks an executed
/// [`Relation`] against this projection. The exporter lifts the
/// scenario's `expected_bindings` block into this field.
#[serde(default)]
pub expected_bindings: Option<ExpectedBindings>,
}
/// Expected query result, projected to a named subset of variables. The
/// columns named here must all appear in the runner's output; any extra
/// columns (typically per-atom wildcards) are ignored.
#[derive(Debug, Clone, Deserialize)]
pub struct ExpectedBindings {
pub columns: Vec<String>,
pub rows: Vec<Vec<JsonValue>>,
}
/// Mirrors `Geolog.DB.Plan.PlanGraph`: a set of nodes plus the id of the
/// rooted result node (the last node in topological order).
#[derive(Debug, Clone, Deserialize)]
pub struct Query {
pub root: u32,
pub nodes: Vec<Node>,
}
/// One node of the plan DAG. `id`s don't need to start at any particular
/// value, mirroring the Haskell `PlanNodeId`.
#[derive(Debug, Clone, Deserialize)]
pub struct Node {
pub id: u32,
pub action: Action,
}
/// What to compute at a node. Tagged externally so JSON reads as
/// `{"action": {"scan": {...}}}` or `{"action": {"join": {...}}}`.
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum Action {
Scan(Atom),
Join(Join),
}
/// A flat atom pattern, one entry per column of the target relation.
/// Matches the `toFlatArgs` view used by `Geolog.DB.InMemory.evalAtom`:
/// `qaValues` positions are filled in directly, and the entity-id column
/// (if any) appears at the last position. Wildcard positions in the
/// Haskell `QAtom` (a `Map Int QVal` with a missing key) translate to a
/// fresh, unique variable name on this side, which the operator binds but
/// never joins against.
#[derive(Debug, Clone, Deserialize)]
pub struct Atom {
pub table: String,
pub columns: Vec<JsonTerm>,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum JsonTerm {
Var(String),
Lit(JsonValue),
}
/// Wire-level value tag. Restricted to what
/// [`storage::value::Value`](storage::value::Value) carries. Entity identities from
/// the Haskell side (`ValEntity path id`) round-trip through `Str` using a
/// `"path:id"` convention; that's a fixture concern, not a runner concern.
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum JsonValue {
Int(i64),
Str(String),
}
#[derive(Debug, Clone, Deserialize)]
pub struct Join {
pub op: JoinOp,
pub left: u32,
pub right: u32,
}
#[derive(Debug, Clone, Copy, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum JoinOp {
/// `Geolog.DB.Plan.LeftJoin`: result is `left` rows whose shared columns
/// appear in `right`. Lowered to `semijoin(left, right)`.
Left,
/// `Geolog.DB.Plan.RightJoin`: result is `right` rows whose shared
/// columns appear in `left`. Lowered to `semijoin(right, left)`.
Right,
/// `Geolog.DB.Plan.NaturalJoin`. Lowered to `natural_join(left, right)`.
Natural,
}
/// Errors produced by [`verify`] when actual bindings don't match the
/// scenario's `expected_bindings` projection.
#[derive(Debug)]
pub enum VerifyError {
/// An expected column wasn't produced by the plan.
MissingColumn(String),
/// An expected row's width didn't match the column count.
ExpectedRowArity { expected: usize, got: usize },
/// The expected and actual rows (after projection) differ as multisets.
BindingsMismatch {
expected: Vec<Vec<Value>>,
actual: Vec<Vec<Value>>,
},
}
impl std::fmt::Display for VerifyError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::MissingColumn(name) => {
write!(f, "expected column {name:?} not in plan output")
}
Self::ExpectedRowArity { expected, got } => write!(
f,
"expected row has {got} cells but columns has {expected} entries"
),
Self::BindingsMismatch { expected, actual } => write!(
f,
"bindings mismatch:\n expected: {expected:?}\n actual: {actual:?}"
),
}
}
}
impl std::error::Error for VerifyError {}
/// Cross-check an executed [`Relation`] against a [`Plan`]'s
/// `expected_bindings`. Projects `actual` to the expected columns (so the
/// runner is free to surface wildcard columns the oracle doesn't name) and
/// compares as a multiset.
///
/// Returns `Ok(true)` if the plan carried an oracle and it matched,
/// `Ok(false)` if there was no oracle (caller decides whether that's an
/// error). Returns [`VerifyError`] on mismatch.
///
/// # Errors
/// See [`VerifyError`].
pub fn verify(plan: &Plan, actual: &Relation) -> Result<bool, VerifyError> {
let Some(expected) = &plan.expected_bindings else {
return Ok(false);
};
let mut projection: Vec<usize> = Vec::with_capacity(expected.columns.len());
for col in &expected.columns {
let idx = actual
.columns
.iter()
.position(|c| c == col)
.ok_or_else(|| VerifyError::MissingColumn(col.clone()))?;
projection.push(idx);
}
let mut actual_proj: Vec<Vec<Value>> = actual
.rows
.iter()
.map(|row| projection.iter().map(|&i| row[i].clone()).collect())
.collect();
let mut expected_proj: Vec<Vec<Value>> = Vec::with_capacity(expected.rows.len());
for row in &expected.rows {
if row.len() != expected.columns.len() {
return Err(VerifyError::ExpectedRowArity {
expected: expected.columns.len(),
got: row.len(),
});
}
expected_proj.push(row.iter().cloned().map(Value::from).collect());
}
// Value is not Ord; use Debug-derived sort keys to compare as a multiset.
let key = |row: &[Value]| -> String { format!("{row:?}") };
actual_proj.sort_by_key(|r| key(r));
expected_proj.sort_by_key(|r| key(r));
if actual_proj == expected_proj {
Ok(true)
} else {
Err(VerifyError::BindingsMismatch {
expected: expected_proj,
actual: actual_proj,
})
}
}
/// Errors a runner can produce during plan validation and execution.
#[derive(Debug)]
pub enum RunError {
/// A fact or scan references a relation that isn't declared in `schema`.
UnknownRelation(String),
/// A scan refers to a table that wasn't supplied in the input map.
MissingTable(String),
/// A fact row's length doesn't match the schema's declared arity.
ArityMismatch {
relation: String,
expected: usize,
got: usize,
},
/// A scan's atom pattern doesn't match the table's arity.
PatternArityMismatch {
table: String,
table_arity: usize,
pattern_arity: usize,
},
/// A join node references a node id that doesn't exist.
MissingNode(u32),
/// `Query.root` doesn't match any node in `nodes`.
MissingRoot(u32),
/// Two nodes share the same id.
DuplicateNode(u32),
/// A join node references its left or right side before that side has
/// been computed: the DAG isn't actually topologically sorted by id, or
/// it has a cycle.
UnresolvedDependency { node: u32, depends_on: u32 },
/// A [`Storage`] backend used to materialize tables returned an error.
Storage(StorageError),
}
impl From<StorageError> for RunError {
fn from(err: StorageError) -> Self {
Self::Storage(err)
}
}
impl std::fmt::Display for RunError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::UnknownRelation(name) => {
write!(f, "facts reference relation {name:?} not in schema")
}
Self::MissingTable(name) => write!(f, "scan references missing table {name:?}"),
Self::ArityMismatch {
relation,
expected,
got,
} => write!(
f,
"relation {relation:?}: row arity {got} differs from schema arity {expected}"
),
Self::PatternArityMismatch {
table,
table_arity,
pattern_arity,
} => write!(
f,
"scan of {table:?}: pattern has {pattern_arity} columns, table has {table_arity}"
),
Self::MissingNode(id) => write!(f, "plan references missing node id {id}"),
Self::MissingRoot(id) => write!(f, "plan root id {id} matches no node"),
Self::DuplicateNode(id) => write!(f, "duplicate node id {id} in plan"),
Self::UnresolvedDependency { node, depends_on } => write!(
f,
"node {node} depends on {depends_on}, which has not been computed yet"
),
Self::Storage(err) => write!(f, "storage backend error: {err}"),
}
}
}
impl std::error::Error for RunError {}
impl From<JsonValue> for Value {
fn from(jv: JsonValue) -> Self {
match jv {
JsonValue::Int(n) => Self::Int(n),
JsonValue::Str(s) => Self::Str(s),
}
}
}
impl From<JsonTerm> for Term {
fn from(t: JsonTerm) -> Self {
match t {
JsonTerm::Var(name) => Self::Var(name),
JsonTerm::Lit(value) => Self::Lit(value.into()),
}
}
}
/// Parse a [`Plan`] from a JSON string.
///
/// # Errors
/// Returns a [`serde_json::Error`] if the input isn't valid JSON in the
/// expected shape.
pub fn parse_plan(json: &str) -> Result<Plan, serde_json::Error> {
serde_json::from_str(json)
}
/// Build the input [`Table`] for each relation declared in a [`Plan`]'s
/// schema, populating rows from the plan's `facts` map. Relations with no
/// facts get an empty table at the declared arity.
///
/// # Errors
/// Returns [`RunError::UnknownRelation`] if `facts` mentions a relation
/// not in `schema`, or [`RunError::ArityMismatch`] if a row's width doesn't
/// match the declared arity.
pub fn build_tables(plan: &Plan) -> Result<HashMap<String, Table>, RunError> {
let mut tables: HashMap<String, Table> = plan
.schema
.iter()
.map(|(name, arity)| (name.clone(), Table::new(*arity)))
.collect();
for (name, rows) in &plan.facts {
let Some(table) = tables.get_mut(name) else {
return Err(RunError::UnknownRelation(name.clone()));
};
for row in rows {
if row.len() != table.arity {
return Err(RunError::ArityMismatch {
relation: name.clone(),
expected: table.arity,
got: row.len(),
});
}
let cells: Vec<Value> = row.iter().cloned().map(Value::from).collect();
table.push(cells);
}
}
Ok(tables)
}
/// Populate a [`Storage`] backend from a [`Plan`]'s schema and facts, then
/// materialize each declared relation back into an in-memory [`Table`] via
/// [`scan_as_table`]. The returned map is the same shape [`execute`]
/// consumes, so this is the storage-backed analogue of [`build_tables`].
///
/// Adding a new backend means constructing a different `S` at the call
/// site; the body here doesn't need to change.
///
/// # Errors
/// Returns [`RunError::UnknownRelation`] or [`RunError::ArityMismatch`] on
/// the same conditions as [`build_tables`], or [`RunError::Storage`] when
/// the backend itself rejects an operation.
pub fn build_tables_via_storage<S: Storage>(
plan: &Plan,
storage: &mut S,
) -> Result<HashMap<String, Table>, RunError> {
for (name, arity) in &plan.schema {
storage.create_relation(name, *arity)?;
}
{
let mut tx = storage.transaction()?;
for (name, rows) in &plan.facts {
let Some(&arity) = plan.schema.get(name) else {
return Err(RunError::UnknownRelation(name.clone()));
};
for row in rows {
if row.len() != arity {
return Err(RunError::ArityMismatch {
relation: name.clone(),
expected: arity,
got: row.len(),
});
}
let cells: Vec<Value> = row.iter().cloned().map(Value::from).collect();
tx.insert(name, cells)?;
}
}
tx.commit()?;
}
let mut tables: HashMap<String, Table> = HashMap::with_capacity(plan.schema.len());
for name in plan.schema.keys() {
let table = scan_as_table(storage as &dyn Storage, name)?;
tables.insert(name.clone(), table);
}
Ok(tables)
}
/// Execute a query DAG against the supplied input tables, returning the
/// bindings [`Relation`] for the rooted plan node.
///
/// Nodes are executed in ascending `id` order. For a Yannakakis plan as
/// emitted by `Geolog.DB.Plan` this is equivalent to a topological sort,
/// since `insertJoin` only references node ids that have already been
/// allocated. A non-monotone id ordering is rejected with
/// [`RunError::UnresolvedDependency`].
///
/// # Errors
/// Returns [`RunError::DuplicateNode`] for repeated ids,
/// [`RunError::MissingNode`] for join references to unknown ids,
/// [`RunError::MissingRoot`] if `query.root` isn't present,
/// [`RunError::MissingTable`] if a scan references a table not in the map,
/// or [`RunError::PatternArityMismatch`] if a scan's pattern doesn't match
/// the table's arity.
pub fn execute<S: std::hash::BuildHasher>(
tables: &HashMap<String, Table, S>,
query: &Query,
) -> Result<Relation, RunError> {
let mut seen_ids: std::collections::HashSet<u32> =
std::collections::HashSet::with_capacity(query.nodes.len());
for node in &query.nodes {
if !seen_ids.insert(node.id) {
return Err(RunError::DuplicateNode(node.id));
}
}
if !seen_ids.contains(&query.root) {
return Err(RunError::MissingRoot(query.root));
}
let mut ordered: Vec<&Node> = query.nodes.iter().collect();
ordered.sort_by_key(|n| n.id);
let mut results: HashMap<u32, Relation> = HashMap::with_capacity(ordered.len());
for node in ordered {
let computed = match &node.action {
Action::Scan(atom) => {
let table = tables
.get(&atom.table)
.ok_or_else(|| RunError::MissingTable(atom.table.clone()))?;
if atom.columns.len() != table.arity {
return Err(RunError::PatternArityMismatch {
table: atom.table.clone(),
table_arity: table.arity,
pattern_arity: atom.columns.len(),
});
}
let pattern = AtomPattern {
columns: atom.columns.iter().cloned().map(Term::from).collect(),
};
scan_atom(table, &pattern)
}
Action::Join(join) => {
let left = require_dep(&results, &seen_ids, node.id, join.left)?;
let right = require_dep(&results, &seen_ids, node.id, join.right)?;
match join.op {
JoinOp::Left => semijoin(left, right),
JoinOp::Right => semijoin(right, left),
JoinOp::Natural => natural_join(left, right),
}
}
};
results.insert(node.id, computed);
}
results
.remove(&query.root)
.ok_or(RunError::MissingRoot(query.root))
}
fn require_dep<'a>(
results: &'a HashMap<u32, Relation>,
seen: &std::collections::HashSet<u32>,
node: u32,
depends_on: u32,
) -> Result<&'a Relation, RunError> {
if let Some(rel) = results.get(&depends_on) {
Ok(rel)
} else if seen.contains(&depends_on) {
Err(RunError::UnresolvedDependency { node, depends_on })
} else {
Err(RunError::MissingNode(depends_on))
}
}
/// Convenience: parse JSON, build tables from the embedded facts, and
/// execute, returning the root binding relation. Equivalent to
/// `parse_plan` + [`build_tables`] + [`execute`].
///
/// # Errors
/// Returns a JSON parse error if the input is malformed, or a [`RunError`]
/// for any later step.
pub fn run_json(json: &str) -> Result<Relation, RunFromJsonError> {
let plan = parse_plan(json).map_err(RunFromJsonError::Parse)?;
let tables = build_tables(&plan).map_err(RunFromJsonError::Run)?;
let bindings = execute(&tables, &plan.query).map_err(RunFromJsonError::Run)?;
Ok(bindings)
}
/// Combined error from [`run_json`].
#[derive(Debug)]
pub enum RunFromJsonError {
Parse(serde_json::Error),
Run(RunError),
}
impl std::fmt::Display for RunFromJsonError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Parse(err) => write!(f, "parse error: {err}"),
Self::Run(err) => write!(f, "run error: {err}"),
}
}
}
impl std::error::Error for RunFromJsonError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::Parse(err) => Some(err),
Self::Run(err) => Some(err),
}
}
}