541 lines
20 KiB
Rust
541 lines
20 KiB
Rust
//! Snapshot executor for conjunctive-query plans.
|
|
//!
|
|
//! Takes a structural plan (a DAG of `Scan` and `Join` nodes), the input
|
|
//! tables it scans, and walks the DAG via [`query_ops::atom::scan_atom`],
|
|
//! [`query_ops::join::semijoin`], and [`query_ops::join::natural_join`].
|
|
//! The result is a binding [`Relation`](query_ops::relation::Relation) over
|
|
//! the query's variables.
|
|
//!
|
|
//! The runner is intentionally backend-agnostic: it depends only on
|
|
//! `query-ops`, and the planner that emits the JSON IR is decoupled from
|
|
//! the storage backend that produced the facts. To execute a plan against
|
|
//! a [`Storage`](storage::Storage) backend, materialize each input table
|
|
//! with [`storage::scan_as_table`] and call [`execute`] with the resulting
|
|
//! map. The in-tree `tests/storage_roundtrip.rs` is the canonical example.
|
|
//!
|
|
//! The JSON IR mirrors `Geolog.DB.Plan.PlanGraph` and
|
|
//! `Geolog.DB.InMemory.QAtom` from the `external/geolog` submodule, but the
|
|
//! shape is the contract: any frontend that emits this JSON can use the
|
|
//! runner.
|
|
//!
|
|
//! Operator mapping from the Haskell planner:
|
|
//!
|
|
//! | `Geolog.DB.Plan` | this crate |
|
|
//! |-----------------------------|-----------------------------------------------|
|
|
//! | `PlanEvalAtom` | [`Action::Scan`] → `scan_atom` |
|
|
//! | `PlanJoin LeftJoin a b` | [`Action::Join`] with [`JoinOp::Left`] → `semijoin(rel[a], rel[b])` |
|
|
//! | `PlanJoin RightJoin a b` | [`Action::Join`] with [`JoinOp::Right`] → `semijoin(rel[b], rel[a])` |
|
|
//! | `PlanJoin NaturalJoin a b` | [`Action::Join`] with [`JoinOp::Natural`] → `natural_join(rel[a], rel[b])` |
|
|
//!
|
|
//! The atom side covers `evalAtom` (`Geolog.DB.InMemory`): a [`Term::Var`]
|
|
//! repeated across positions enforces equality, [`Term::Lit`] filters by
|
|
//! constant, and distinct variables project in first-occurrence order.
|
|
|
|
use std::collections::HashMap;
|
|
|
|
use serde::Deserialize;
|
|
|
|
use query_ops::atom::{AtomPattern, Term, scan_atom};
|
|
use query_ops::join::{natural_join, semijoin};
|
|
use query_ops::relation::Relation;
|
|
use storage::table::Table;
|
|
use storage::value::Value;
|
|
use storage::{Storage, StorageError, scan_as_table};
|
|
|
|
/// A single fixture: schema, ground facts, and a query plan to execute.
|
|
#[derive(Debug, Clone, Deserialize)]
|
|
pub struct Plan {
|
|
/// Relation name → arity (column count).
|
|
pub schema: HashMap<String, usize>,
|
|
/// Relation name → list of ground tuples to insert before execution.
|
|
pub facts: HashMap<String, Vec<Vec<JsonValue>>>,
|
|
/// The join DAG itself.
|
|
pub query: Query,
|
|
/// Optional oracle: if present, [`verify`] cross-checks an executed
|
|
/// [`Relation`] against this projection. The exporter lifts the
|
|
/// scenario's `expected_bindings` block into this field.
|
|
#[serde(default)]
|
|
pub expected_bindings: Option<ExpectedBindings>,
|
|
}
|
|
|
|
/// Expected query result, projected to a named subset of variables. The
|
|
/// columns named here must all appear in the runner's output; any extra
|
|
/// columns (typically per-atom wildcards) are ignored.
|
|
#[derive(Debug, Clone, Deserialize)]
|
|
pub struct ExpectedBindings {
|
|
pub columns: Vec<String>,
|
|
pub rows: Vec<Vec<JsonValue>>,
|
|
}
|
|
|
|
/// Mirrors `Geolog.DB.Plan.PlanGraph`: a set of nodes plus the id of the
|
|
/// rooted result node (the last node in topological order).
|
|
#[derive(Debug, Clone, Deserialize)]
|
|
pub struct Query {
|
|
pub root: u32,
|
|
pub nodes: Vec<Node>,
|
|
}
|
|
|
|
/// One node of the plan DAG. `id`s don't need to start at any particular
|
|
/// value, mirroring the Haskell `PlanNodeId`.
|
|
#[derive(Debug, Clone, Deserialize)]
|
|
pub struct Node {
|
|
pub id: u32,
|
|
pub action: Action,
|
|
}
|
|
|
|
/// What to compute at a node. Tagged externally so JSON reads as
|
|
/// `{"action": {"scan": {...}}}` or `{"action": {"join": {...}}}`.
|
|
#[derive(Debug, Clone, Deserialize)]
|
|
#[serde(rename_all = "snake_case")]
|
|
pub enum Action {
|
|
Scan(Atom),
|
|
Join(Join),
|
|
}
|
|
|
|
/// A flat atom pattern, one entry per column of the target relation.
|
|
/// Matches the `toFlatArgs` view used by `Geolog.DB.InMemory.evalAtom`:
|
|
/// `qaValues` positions are filled in directly, and the entity-id column
|
|
/// (if any) appears at the last position. Wildcard positions in the
|
|
/// Haskell `QAtom` (a `Map Int QVal` with a missing key) translate to a
|
|
/// fresh, unique variable name on this side, which the operator binds but
|
|
/// never joins against.
|
|
#[derive(Debug, Clone, Deserialize)]
|
|
pub struct Atom {
|
|
pub table: String,
|
|
pub columns: Vec<JsonTerm>,
|
|
}
|
|
|
|
#[derive(Debug, Clone, Deserialize)]
|
|
#[serde(rename_all = "snake_case")]
|
|
pub enum JsonTerm {
|
|
Var(String),
|
|
Lit(JsonValue),
|
|
}
|
|
|
|
/// Wire-level value tag. Restricted to what
|
|
/// [`storage::value::Value`](storage::value::Value) carries. Entity identities from
|
|
/// the Haskell side (`ValEntity path id`) round-trip through `Str` using a
|
|
/// `"path:id"` convention; that's a fixture concern, not a runner concern.
|
|
#[derive(Debug, Clone, Deserialize)]
|
|
#[serde(rename_all = "snake_case")]
|
|
pub enum JsonValue {
|
|
Int(i64),
|
|
Str(String),
|
|
}
|
|
|
|
#[derive(Debug, Clone, Deserialize)]
|
|
pub struct Join {
|
|
pub op: JoinOp,
|
|
pub left: u32,
|
|
pub right: u32,
|
|
}
|
|
|
|
#[derive(Debug, Clone, Copy, Deserialize)]
|
|
#[serde(rename_all = "snake_case")]
|
|
pub enum JoinOp {
|
|
/// `Geolog.DB.Plan.LeftJoin`: result is `left` rows whose shared columns
|
|
/// appear in `right`. Lowered to `semijoin(left, right)`.
|
|
Left,
|
|
/// `Geolog.DB.Plan.RightJoin`: result is `right` rows whose shared
|
|
/// columns appear in `left`. Lowered to `semijoin(right, left)`.
|
|
Right,
|
|
/// `Geolog.DB.Plan.NaturalJoin`. Lowered to `natural_join(left, right)`.
|
|
Natural,
|
|
}
|
|
|
|
/// Errors produced by [`verify`] when actual bindings don't match the
|
|
/// scenario's `expected_bindings` projection.
|
|
#[derive(Debug)]
|
|
pub enum VerifyError {
|
|
/// An expected column wasn't produced by the plan.
|
|
MissingColumn(String),
|
|
/// An expected row's width didn't match the column count.
|
|
ExpectedRowArity { expected: usize, got: usize },
|
|
/// The expected and actual rows (after projection) differ as multisets.
|
|
BindingsMismatch {
|
|
expected: Vec<Vec<Value>>,
|
|
actual: Vec<Vec<Value>>,
|
|
},
|
|
}
|
|
|
|
impl std::fmt::Display for VerifyError {
|
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
match self {
|
|
Self::MissingColumn(name) => {
|
|
write!(f, "expected column {name:?} not in plan output")
|
|
}
|
|
Self::ExpectedRowArity { expected, got } => write!(
|
|
f,
|
|
"expected row has {got} cells but columns has {expected} entries"
|
|
),
|
|
Self::BindingsMismatch { expected, actual } => write!(
|
|
f,
|
|
"bindings mismatch:\n expected: {expected:?}\n actual: {actual:?}"
|
|
),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl std::error::Error for VerifyError {}
|
|
|
|
/// Cross-check an executed [`Relation`] against a [`Plan`]'s
|
|
/// `expected_bindings`. Projects `actual` to the expected columns (so the
|
|
/// runner is free to surface wildcard columns the oracle doesn't name) and
|
|
/// compares as a multiset.
|
|
///
|
|
/// Returns `Ok(true)` if the plan carried an oracle and it matched,
|
|
/// `Ok(false)` if there was no oracle (caller decides whether that's an
|
|
/// error). Returns [`VerifyError`] on mismatch.
|
|
///
|
|
/// # Errors
|
|
/// See [`VerifyError`].
|
|
pub fn verify(plan: &Plan, actual: &Relation) -> Result<bool, VerifyError> {
|
|
let Some(expected) = &plan.expected_bindings else {
|
|
return Ok(false);
|
|
};
|
|
let mut projection: Vec<usize> = Vec::with_capacity(expected.columns.len());
|
|
for col in &expected.columns {
|
|
let idx = actual
|
|
.columns
|
|
.iter()
|
|
.position(|c| c == col)
|
|
.ok_or_else(|| VerifyError::MissingColumn(col.clone()))?;
|
|
projection.push(idx);
|
|
}
|
|
let mut actual_proj: Vec<Vec<Value>> = actual
|
|
.rows
|
|
.iter()
|
|
.map(|row| projection.iter().map(|&i| row[i].clone()).collect())
|
|
.collect();
|
|
let mut expected_proj: Vec<Vec<Value>> = Vec::with_capacity(expected.rows.len());
|
|
for row in &expected.rows {
|
|
if row.len() != expected.columns.len() {
|
|
return Err(VerifyError::ExpectedRowArity {
|
|
expected: expected.columns.len(),
|
|
got: row.len(),
|
|
});
|
|
}
|
|
expected_proj.push(row.iter().cloned().map(Value::from).collect());
|
|
}
|
|
// Value is not Ord; use Debug-derived sort keys to compare as a multiset.
|
|
let key = |row: &[Value]| -> String { format!("{row:?}") };
|
|
actual_proj.sort_by_key(|r| key(r));
|
|
expected_proj.sort_by_key(|r| key(r));
|
|
if actual_proj == expected_proj {
|
|
Ok(true)
|
|
} else {
|
|
Err(VerifyError::BindingsMismatch {
|
|
expected: expected_proj,
|
|
actual: actual_proj,
|
|
})
|
|
}
|
|
}
|
|
|
|
/// Errors a runner can produce during plan validation and execution.
|
|
#[derive(Debug)]
|
|
pub enum RunError {
|
|
/// A fact or scan references a relation that isn't declared in `schema`.
|
|
UnknownRelation(String),
|
|
/// A scan refers to a table that wasn't supplied in the input map.
|
|
MissingTable(String),
|
|
/// A fact row's length doesn't match the schema's declared arity.
|
|
ArityMismatch {
|
|
relation: String,
|
|
expected: usize,
|
|
got: usize,
|
|
},
|
|
/// A scan's atom pattern doesn't match the table's arity.
|
|
PatternArityMismatch {
|
|
table: String,
|
|
table_arity: usize,
|
|
pattern_arity: usize,
|
|
},
|
|
/// A join node references a node id that doesn't exist.
|
|
MissingNode(u32),
|
|
/// `Query.root` doesn't match any node in `nodes`.
|
|
MissingRoot(u32),
|
|
/// Two nodes share the same id.
|
|
DuplicateNode(u32),
|
|
/// A join node references its left or right side before that side has
|
|
/// been computed: the DAG isn't actually topologically sorted by id, or
|
|
/// it has a cycle.
|
|
UnresolvedDependency { node: u32, depends_on: u32 },
|
|
/// A [`Storage`] backend used to materialize tables returned an error.
|
|
Storage(StorageError),
|
|
}
|
|
|
|
impl From<StorageError> for RunError {
|
|
fn from(err: StorageError) -> Self {
|
|
Self::Storage(err)
|
|
}
|
|
}
|
|
|
|
impl std::fmt::Display for RunError {
|
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
match self {
|
|
Self::UnknownRelation(name) => {
|
|
write!(f, "facts reference relation {name:?} not in schema")
|
|
}
|
|
Self::MissingTable(name) => write!(f, "scan references missing table {name:?}"),
|
|
Self::ArityMismatch {
|
|
relation,
|
|
expected,
|
|
got,
|
|
} => write!(
|
|
f,
|
|
"relation {relation:?}: row arity {got} differs from schema arity {expected}"
|
|
),
|
|
Self::PatternArityMismatch {
|
|
table,
|
|
table_arity,
|
|
pattern_arity,
|
|
} => write!(
|
|
f,
|
|
"scan of {table:?}: pattern has {pattern_arity} columns, table has {table_arity}"
|
|
),
|
|
Self::MissingNode(id) => write!(f, "plan references missing node id {id}"),
|
|
Self::MissingRoot(id) => write!(f, "plan root id {id} matches no node"),
|
|
Self::DuplicateNode(id) => write!(f, "duplicate node id {id} in plan"),
|
|
Self::UnresolvedDependency { node, depends_on } => write!(
|
|
f,
|
|
"node {node} depends on {depends_on}, which has not been computed yet"
|
|
),
|
|
Self::Storage(err) => write!(f, "storage backend error: {err}"),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl std::error::Error for RunError {}
|
|
|
|
impl From<JsonValue> for Value {
|
|
fn from(jv: JsonValue) -> Self {
|
|
match jv {
|
|
JsonValue::Int(n) => Self::Int(n),
|
|
JsonValue::Str(s) => Self::Str(s),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl From<JsonTerm> for Term {
|
|
fn from(t: JsonTerm) -> Self {
|
|
match t {
|
|
JsonTerm::Var(name) => Self::Var(name),
|
|
JsonTerm::Lit(value) => Self::Lit(value.into()),
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Parse a [`Plan`] from a JSON string.
|
|
///
|
|
/// # Errors
|
|
/// Returns a [`serde_json::Error`] if the input isn't valid JSON in the
|
|
/// expected shape.
|
|
pub fn parse_plan(json: &str) -> Result<Plan, serde_json::Error> {
|
|
serde_json::from_str(json)
|
|
}
|
|
|
|
/// Build the input [`Table`] for each relation declared in a [`Plan`]'s
|
|
/// schema, populating rows from the plan's `facts` map. Relations with no
|
|
/// facts get an empty table at the declared arity.
|
|
///
|
|
/// # Errors
|
|
/// Returns [`RunError::UnknownRelation`] if `facts` mentions a relation
|
|
/// not in `schema`, or [`RunError::ArityMismatch`] if a row's width doesn't
|
|
/// match the declared arity.
|
|
pub fn build_tables(plan: &Plan) -> Result<HashMap<String, Table>, RunError> {
|
|
let mut tables: HashMap<String, Table> = plan
|
|
.schema
|
|
.iter()
|
|
.map(|(name, arity)| (name.clone(), Table::new(*arity)))
|
|
.collect();
|
|
for (name, rows) in &plan.facts {
|
|
let Some(table) = tables.get_mut(name) else {
|
|
return Err(RunError::UnknownRelation(name.clone()));
|
|
};
|
|
for row in rows {
|
|
if row.len() != table.arity {
|
|
return Err(RunError::ArityMismatch {
|
|
relation: name.clone(),
|
|
expected: table.arity,
|
|
got: row.len(),
|
|
});
|
|
}
|
|
let cells: Vec<Value> = row.iter().cloned().map(Value::from).collect();
|
|
table.push(cells);
|
|
}
|
|
}
|
|
Ok(tables)
|
|
}
|
|
|
|
/// Populate a [`Storage`] backend from a [`Plan`]'s schema and facts, then
|
|
/// materialize each declared relation back into an in-memory [`Table`] via
|
|
/// [`scan_as_table`]. The returned map is the same shape [`execute`]
|
|
/// consumes, so this is the storage-backed analogue of [`build_tables`].
|
|
///
|
|
/// Adding a new backend means constructing a different `S` at the call
|
|
/// site; the body here doesn't need to change.
|
|
///
|
|
/// # Errors
|
|
/// Returns [`RunError::UnknownRelation`] or [`RunError::ArityMismatch`] on
|
|
/// the same conditions as [`build_tables`], or [`RunError::Storage`] when
|
|
/// the backend itself rejects an operation.
|
|
pub fn build_tables_via_storage<S: Storage>(
|
|
plan: &Plan,
|
|
storage: &mut S,
|
|
) -> Result<HashMap<String, Table>, RunError> {
|
|
for (name, arity) in &plan.schema {
|
|
storage.create_relation(name, *arity)?;
|
|
}
|
|
{
|
|
let mut tx = storage.transaction()?;
|
|
for (name, rows) in &plan.facts {
|
|
let Some(&arity) = plan.schema.get(name) else {
|
|
return Err(RunError::UnknownRelation(name.clone()));
|
|
};
|
|
for row in rows {
|
|
if row.len() != arity {
|
|
return Err(RunError::ArityMismatch {
|
|
relation: name.clone(),
|
|
expected: arity,
|
|
got: row.len(),
|
|
});
|
|
}
|
|
let cells: Vec<Value> = row.iter().cloned().map(Value::from).collect();
|
|
tx.insert(name, cells)?;
|
|
}
|
|
}
|
|
tx.commit()?;
|
|
}
|
|
let mut tables: HashMap<String, Table> = HashMap::with_capacity(plan.schema.len());
|
|
for name in plan.schema.keys() {
|
|
let table = scan_as_table(storage as &dyn Storage, name)?;
|
|
tables.insert(name.clone(), table);
|
|
}
|
|
Ok(tables)
|
|
}
|
|
|
|
/// Execute a query DAG against the supplied input tables, returning the
|
|
/// bindings [`Relation`] for the rooted plan node.
|
|
///
|
|
/// Nodes are executed in ascending `id` order. For a Yannakakis plan as
|
|
/// emitted by `Geolog.DB.Plan` this is equivalent to a topological sort,
|
|
/// since `insertJoin` only references node ids that have already been
|
|
/// allocated. A non-monotone id ordering is rejected with
|
|
/// [`RunError::UnresolvedDependency`].
|
|
///
|
|
/// # Errors
|
|
/// Returns [`RunError::DuplicateNode`] for repeated ids,
|
|
/// [`RunError::MissingNode`] for join references to unknown ids,
|
|
/// [`RunError::MissingRoot`] if `query.root` isn't present,
|
|
/// [`RunError::MissingTable`] if a scan references a table not in the map,
|
|
/// or [`RunError::PatternArityMismatch`] if a scan's pattern doesn't match
|
|
/// the table's arity.
|
|
pub fn execute<S: std::hash::BuildHasher>(
|
|
tables: &HashMap<String, Table, S>,
|
|
query: &Query,
|
|
) -> Result<Relation, RunError> {
|
|
let mut seen_ids: std::collections::HashSet<u32> =
|
|
std::collections::HashSet::with_capacity(query.nodes.len());
|
|
for node in &query.nodes {
|
|
if !seen_ids.insert(node.id) {
|
|
return Err(RunError::DuplicateNode(node.id));
|
|
}
|
|
}
|
|
if !seen_ids.contains(&query.root) {
|
|
return Err(RunError::MissingRoot(query.root));
|
|
}
|
|
|
|
let mut ordered: Vec<&Node> = query.nodes.iter().collect();
|
|
ordered.sort_by_key(|n| n.id);
|
|
|
|
let mut results: HashMap<u32, Relation> = HashMap::with_capacity(ordered.len());
|
|
for node in ordered {
|
|
let computed = match &node.action {
|
|
Action::Scan(atom) => {
|
|
let table = tables
|
|
.get(&atom.table)
|
|
.ok_or_else(|| RunError::MissingTable(atom.table.clone()))?;
|
|
if atom.columns.len() != table.arity {
|
|
return Err(RunError::PatternArityMismatch {
|
|
table: atom.table.clone(),
|
|
table_arity: table.arity,
|
|
pattern_arity: atom.columns.len(),
|
|
});
|
|
}
|
|
let pattern = AtomPattern {
|
|
columns: atom.columns.iter().cloned().map(Term::from).collect(),
|
|
};
|
|
scan_atom(table, &pattern)
|
|
}
|
|
Action::Join(join) => {
|
|
let left = require_dep(&results, &seen_ids, node.id, join.left)?;
|
|
let right = require_dep(&results, &seen_ids, node.id, join.right)?;
|
|
match join.op {
|
|
JoinOp::Left => semijoin(left, right),
|
|
JoinOp::Right => semijoin(right, left),
|
|
JoinOp::Natural => natural_join(left, right),
|
|
}
|
|
}
|
|
};
|
|
results.insert(node.id, computed);
|
|
}
|
|
|
|
results
|
|
.remove(&query.root)
|
|
.ok_or(RunError::MissingRoot(query.root))
|
|
}
|
|
|
|
fn require_dep<'a>(
|
|
results: &'a HashMap<u32, Relation>,
|
|
seen: &std::collections::HashSet<u32>,
|
|
node: u32,
|
|
depends_on: u32,
|
|
) -> Result<&'a Relation, RunError> {
|
|
if let Some(rel) = results.get(&depends_on) {
|
|
Ok(rel)
|
|
} else if seen.contains(&depends_on) {
|
|
Err(RunError::UnresolvedDependency { node, depends_on })
|
|
} else {
|
|
Err(RunError::MissingNode(depends_on))
|
|
}
|
|
}
|
|
|
|
/// Convenience: parse JSON, build tables from the embedded facts, and
|
|
/// execute, returning the root binding relation. Equivalent to
|
|
/// `parse_plan` + [`build_tables`] + [`execute`].
|
|
///
|
|
/// # Errors
|
|
/// Returns a JSON parse error if the input is malformed, or a [`RunError`]
|
|
/// for any later step.
|
|
pub fn run_json(json: &str) -> Result<Relation, RunFromJsonError> {
|
|
let plan = parse_plan(json).map_err(RunFromJsonError::Parse)?;
|
|
let tables = build_tables(&plan).map_err(RunFromJsonError::Run)?;
|
|
let bindings = execute(&tables, &plan.query).map_err(RunFromJsonError::Run)?;
|
|
Ok(bindings)
|
|
}
|
|
|
|
/// Combined error from [`run_json`].
|
|
#[derive(Debug)]
|
|
pub enum RunFromJsonError {
|
|
Parse(serde_json::Error),
|
|
Run(RunError),
|
|
}
|
|
|
|
impl std::fmt::Display for RunFromJsonError {
|
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
match self {
|
|
Self::Parse(err) => write!(f, "parse error: {err}"),
|
|
Self::Run(err) => write!(f, "run error: {err}"),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl std::error::Error for RunFromJsonError {
|
|
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
|
|
match self {
|
|
Self::Parse(err) => Some(err),
|
|
Self::Run(err) => Some(err),
|
|
}
|
|
}
|
|
}
|