From 57a6eaaef69b09bfa3d71620750392d825c90993 Mon Sep 17 00:00:00 2001 From: Hassan Abedi Date: Mon, 13 Apr 2026 11:01:30 +0200 Subject: [PATCH] Add Skolem chase and semi-naive evaluation support --- AGENTS.md | 2 +- README.md | 6 +- ROADMAP.md | 10 +- src/chase/engine.rs | 320 ++++++++++++++++++++++++-- src/chase/inference.rs | 145 ++++++++++++ src/chase/mod.rs | 2 +- src/execution/mod.rs | 4 + src/execution/physical.rs | 470 ++++++++++++++++++++++++++++++++++++++ src/lib.rs | 2 +- 9 files changed, 937 insertions(+), 24 deletions(-) create mode 100644 src/execution/physical.rs diff --git a/AGENTS.md b/AGENTS.md index 1a9918f..74ae1a1 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -59,7 +59,7 @@ Quick examples: - `src/catalog/`: predicate-to-table schema inference and catalog access. - `src/sql/`: narrow SQL AST and parser support. - `src/planner/`: logical plan structures and SQL-to-plan translation. -- `src/execution/`: execution of the current logical plan subset, including the `DataSource` trait and the `TableStore` in-memory source. +- `src/execution/`: execution of the current logical plan subset, including the `DataSource` trait, the `TableStore` in-memory source, and the physical operator layer in `physical.rs` with rule-based rewrites. - `examples/scripts/`: runnable script examples for supported workflows. - `tests/`: integration, regression, and property-based tests. diff --git a/README.md b/README.md index 68ad8a5..3ee9d04 100644 --- a/README.md +++ b/README.md @@ -10,10 +10,12 @@ execution boundaries. ### Current scope - Chase-based rule evaluation over facts, rules, and substitutions -- Restricted-chase style materialization with active-trigger checks +- Restricted, standard, oblivious, and Skolem chase variants +- Optional semi-naive evaluation across all chase variants - Provenance-oriented explanations for derived answers - Script, REPL, and local web UI for experimentation - Relational schema, catalog, logical-plan, and execution scaffolding +- Physical operator scaffolding with a small rule-based rewrite layer - A minimal SQL slice for `SELECT-FROM-WHERE-ORDER BY-LIMIT` queries over predicate-backed tables ### Architecture @@ -26,7 +28,7 @@ The repository is currently organized around a few clear subsystems: - `src/catalog/`: predicate-backed table metadata - `src/sql/`: minimal SQL AST and parser - `src/planner/`: logical plan structures and SQL-to-plan translation -- `src/execution/`: execution for the current logical-plan subset, `DataSource` trait, and `TableStore` +- `src/execution/`: execution for the current logical-plan subset, the `DataSource` trait, the `TableStore`, and a physical operator layer with rule-based rewrites Today, the chase subsystem is still the most mature part of the codebase. The relational and SQL modules are present to create clean extension points for a diff --git a/ROADMAP.md b/ROADMAP.md index 14b665b..2851e4e 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -65,9 +65,9 @@ This document tracks the current state and next steps for the repository. ### Execution and Optimization -- [ ] Introduce physical operator abstractions +- [x] Introduce physical operator abstractions - [x] Add a planning step from logical operators to executable operators -- [ ] Add basic rule-based logical rewrites +- [x] Add basic rule-based logical rewrites - [ ] Add statistics and cost-model scaffolding - [ ] Add indexing and access-path abstractions @@ -76,13 +76,13 @@ This document tracks the current state and next steps for the repository. - [x] Restricted chase - [x] Standard chase - [x] Oblivious chase -- [ ] Skolem chase +- [x] Skolem chase - [ ] Core chase - [ ] Negative constraints - [ ] Stratified negation in rule bodies - [ ] Disjunctive heads - [ ] Aggregation support in rule evaluation -- [ ] Semi-naive evaluation +- [x] Semi-naive evaluation - [ ] Termination analysis helpers ### Data and Interoperability @@ -95,7 +95,7 @@ This document tracks the current state and next steps for the repository. ### Performance and Reliability -- [ ] Predicate indexing for fact lookup +- [x] Predicate indexing for fact lookup - [ ] Incremental evaluation - [ ] Benchmarks - [ ] Fuzzing diff --git a/src/chase/engine.rs b/src/chase/engine.rs index 88cd2f8..3bdda57 100644 --- a/src/chase/engine.rs +++ b/src/chase/engine.rs @@ -5,7 +5,10 @@ use std::error::Error; use std::fmt; use super::atom::Atom; -use super::inference::{NullGenerator, Trigger, apply_rule_head, find_matches, head_is_satisfied}; +use super::inference::{ + NullGenerator, SkolemGenerator, Trigger, apply_rule_head, apply_rule_head_skolem, find_matches, + find_matches_for_step, head_is_satisfied, +}; use super::instance::Instance; use super::rule::{Egd, Rule}; use super::substitution::Substitution; @@ -70,6 +73,13 @@ pub enum ChaseVariant { /// variables this variant will typically not terminate (it will hit the /// step limit) because each application generates fresh nulls. Oblivious, + /// Skolem chase: fires every matching rule application, like oblivious, + /// but binds each existential variable to a deterministic (skolem) null + /// keyed on the rule identity, the existential variable, and the + /// frontier-variable bindings. Re-application with the same frontier + /// bindings produces the same head atom, so the chase terminates for many + /// schemas where the oblivious variant does not. + Skolem, } /// Configuration for the chase algorithm. @@ -79,6 +89,11 @@ pub struct ChaseConfig { pub max_steps: usize, /// The chase variant to use. pub variant: ChaseVariant, + /// When true, the chase uses semi-naive evaluation: each round only + /// considers rule applications that involve at least one fact added in + /// the previous round, instead of re-matching the entire instance. This + /// is a pure performance switch and does not change the chase result. + pub semi_naive: bool, } impl Default for ChaseConfig { @@ -86,6 +101,7 @@ impl Default for ChaseConfig { ChaseConfig { max_steps: 10_000, variant: ChaseVariant::default(), + semi_naive: false, } } } @@ -138,6 +154,22 @@ pub fn oblivious_chase(instance: Instance, rules: &[Rule]) -> ChaseResult { chase_with_config(instance, rules, config) } +/// Run the Skolem chase algorithm. +/// +/// The Skolem chase fires every matching rule application, like the +/// oblivious variant, but binds each existential variable to a deterministic +/// null keyed on the rule, the variable, and the frontier-variable bindings. +/// Re-application with the same frontier bindings reuses the same null, so +/// the chase terminates whenever the set of derivable facts is finite, even +/// in the presence of existentials. +pub fn skolem_chase(instance: Instance, rules: &[Rule]) -> ChaseResult { + let config = ChaseConfig { + variant: ChaseVariant::Skolem, + ..Default::default() + }; + chase_with_config(instance, rules, config) +} + /// Run the chase with custom configuration. pub fn chase_with_config( mut instance: Instance, @@ -145,8 +177,16 @@ pub fn chase_with_config( config: ChaseConfig, ) -> ChaseResult { let mut null_gen = NullGenerator::seeded_from(&instance, rules); + let mut skolem_gen = SkolemGenerator::seeded_from(&instance, rules); let mut applied_triggers: HashSet = HashSet::new(); let mut steps = 0; + // For semi-naive evaluation: at round zero every fact is "new", so the + // delta starts as the full input instance. + let mut delta_owned: Option = if config.semi_naive { + Some(instance.clone()) + } else { + None + }; loop { if steps >= config.max_steps { @@ -158,12 +198,18 @@ pub fn chase_with_config( }; } + let delta = delta_owned.as_ref(); let new_facts = match config.variant { - ChaseVariant::Standard => standard_chase_step(&instance, rules, &mut null_gen), - ChaseVariant::Restricted => { - restricted_chase_step(&instance, rules, &mut null_gen, &mut applied_triggers) - } - ChaseVariant::Oblivious => oblivious_chase_step(&instance, rules, &mut null_gen), + ChaseVariant::Standard => standard_chase_step(&instance, delta, rules, &mut null_gen), + ChaseVariant::Restricted => restricted_chase_step( + &instance, + delta, + rules, + &mut null_gen, + &mut applied_triggers, + ), + ChaseVariant::Oblivious => oblivious_chase_step(&instance, delta, rules, &mut null_gen), + ChaseVariant::Skolem => skolem_chase_step(&instance, delta, rules, &mut skolem_gen), }; if new_facts.is_empty() { @@ -176,9 +222,18 @@ pub fn chase_with_config( }; } + let mut next_delta = if config.semi_naive { + Some(Instance::new()) + } else { + None + }; for fact in new_facts { - instance.add(fact); + let added = instance.add(fact.clone()); + if added && let Some(next) = next_delta.as_mut() { + next.add(fact); + } } + delta_owned = next_delta; steps += 1; } } @@ -186,6 +241,7 @@ pub fn chase_with_config( /// Perform a single standard chase step: apply rules without trigger tracking. fn standard_chase_step( instance: &Instance, + delta: Option<&Instance>, rules: &[Rule], null_gen: &mut NullGenerator, ) -> Vec { @@ -193,7 +249,7 @@ fn standard_chase_step( for rule in rules { // Find all ways to match the rule body against the instance - let matches = find_matches(instance, &rule.body); + let matches = find_matches_for_step(instance, delta, &rule.body); for subst in matches { // In standard chase, we only check if head is satisfied @@ -218,6 +274,7 @@ fn standard_chase_step( /// Perform a single restricted chase step: use trigger tracking to avoid redundant applications. fn restricted_chase_step( instance: &Instance, + delta: Option<&Instance>, rules: &[Rule], null_gen: &mut NullGenerator, applied_triggers: &mut HashSet, @@ -226,7 +283,7 @@ fn restricted_chase_step( for (rule_idx, rule) in rules.iter().enumerate() { // Find all ways to match the rule body against the instance - let matches = find_matches(instance, &rule.body); + let matches = find_matches_for_step(instance, delta, &rule.body); for subst in matches { // Create a trigger to check if we've already applied this @@ -262,13 +319,14 @@ fn restricted_chase_step( /// without checking head satisfaction or tracking triggers. fn oblivious_chase_step( instance: &Instance, + delta: Option<&Instance>, rules: &[Rule], null_gen: &mut NullGenerator, ) -> Vec { let mut new_facts = Vec::new(); for rule in rules { - let matches = find_matches(instance, &rule.body); + let matches = find_matches_for_step(instance, delta, &rule.body); for subst in matches { let derived = apply_rule_head(rule, &subst, null_gen); @@ -284,6 +342,36 @@ fn oblivious_chase_step( new_facts } +/// Perform a single Skolem chase step: fire all matching rule applications +/// using deterministic skolem nulls for existential variables. Natural +/// termination comes from the fact that a rule re-applied with the same +/// frontier bindings produces the same head atom, which is already in the +/// instance. +fn skolem_chase_step( + instance: &Instance, + delta: Option<&Instance>, + rules: &[Rule], + skolem_gen: &mut SkolemGenerator, +) -> Vec { + let mut new_facts = Vec::new(); + + for (rule_index, rule) in rules.iter().enumerate() { + let matches = find_matches_for_step(instance, delta, &rule.body); + + for subst in matches { + let derived = apply_rule_head_skolem(rule_index, rule, &subst, skolem_gen); + + for fact in derived { + if !instance.contains(&fact) { + new_facts.push(fact); + } + } + } + } + + new_facts +} + /// A trigger for EGD applications, tracking which EGD was applied with which body bindings. #[derive(Debug, Clone, PartialEq, Eq, Hash)] struct EgdTrigger { @@ -361,10 +449,19 @@ pub fn chase_full( config: ChaseConfig, ) -> ChaseResult { let mut null_gen = NullGenerator::seeded_from(&instance, tgds); + let mut skolem_gen = SkolemGenerator::seeded_from(&instance, tgds); let mut applied_triggers: HashSet = HashSet::new(); let mut applied_egd_triggers: HashSet = HashSet::new(); let mut uf = UnionFind::new(); let mut steps = 0; + // Semi-naive delta: starts as the full instance so the first round + // matches against everything and is empty after canonicalization + // changes (which can rewrite the whole instance). + let mut delta_owned: Option = if config.semi_naive { + Some(instance.clone()) + } else { + None + }; loop { if steps >= config.max_steps { @@ -377,17 +474,27 @@ pub fn chase_full( } // Apply TGDs + let delta = delta_owned.as_ref(); let new_facts = match config.variant { - ChaseVariant::Standard => standard_chase_step(&instance, tgds, &mut null_gen), + ChaseVariant::Standard => standard_chase_step(&instance, delta, tgds, &mut null_gen), ChaseVariant::Restricted => { - restricted_chase_step(&instance, tgds, &mut null_gen, &mut applied_triggers) + restricted_chase_step(&instance, delta, tgds, &mut null_gen, &mut applied_triggers) } - ChaseVariant::Oblivious => oblivious_chase_step(&instance, tgds, &mut null_gen), + ChaseVariant::Oblivious => oblivious_chase_step(&instance, delta, tgds, &mut null_gen), + ChaseVariant::Skolem => skolem_chase_step(&instance, delta, tgds, &mut skolem_gen), }; let tgd_changes = !new_facts.is_empty(); + let mut next_delta = if config.semi_naive { + Some(Instance::new()) + } else { + None + }; for fact in new_facts { - instance.add(fact); + let added = instance.add(fact.clone()); + if added && let Some(next) = next_delta.as_mut() { + next.add(fact); + } } // Apply EGDs @@ -406,6 +513,13 @@ pub fn chase_full( // Canonicalize instance if EGDs made changes if egd_changes { instance = instance.canonicalize(&mut uf); + // Canonicalization can rewrite any atom in the instance, + // so the previously computed delta is no longer a sound + // approximation of "new" facts. Reset to the full + // instance for the next round. + if config.semi_naive { + next_delta = Some(instance.clone()); + } } // Check for fixpoint @@ -420,6 +534,7 @@ pub fn chase_full( } } + delta_owned = next_delta; steps += 1; } } @@ -842,6 +957,7 @@ mod tests { let config = ChaseConfig { max_steps: 100, variant: ChaseVariant::Standard, + semi_naive: false, }; let result = chase_full(instance, &[tgd], &[], config); @@ -934,6 +1050,7 @@ mod tests { let config = ChaseConfig { max_steps: 10, variant: ChaseVariant::Oblivious, + semi_naive: false, }; let result = chase_with_config(instance, &[rule], config); @@ -943,6 +1060,181 @@ mod tests { assert!(result.instance.facts_for_predicate("HasSSN").len() > 1); } + // Semi-naive evaluation tests + + #[test] + fn test_semi_naive_matches_naive_for_transitive_closure() { + let instance: Instance = vec![ + Atom::new("Edge", vec![Term::constant("a"), Term::constant("b")]), + Atom::new("Edge", vec![Term::constant("b"), Term::constant("c")]), + Atom::new("Edge", vec![Term::constant("c"), Term::constant("d")]), + Atom::new("Edge", vec![Term::constant("d"), Term::constant("e")]), + ] + .into_iter() + .collect(); + + let rule1 = RuleBuilder::new() + .when("Edge", vec![Term::var("X"), Term::var("Y")]) + .then("Path", vec![Term::var("X"), Term::var("Y")]) + .build(); + + let rule2 = RuleBuilder::new() + .when("Path", vec![Term::var("X"), Term::var("Y")]) + .when("Edge", vec![Term::var("Y"), Term::var("Z")]) + .then("Path", vec![Term::var("X"), Term::var("Z")]) + .build(); + + let rules = vec![rule1, rule2]; + + let naive = chase(instance.clone(), &rules); + let semi_naive = chase_with_config( + instance, + &rules, + ChaseConfig { + semi_naive: true, + ..Default::default() + }, + ); + + assert!(naive.terminated); + assert!(semi_naive.terminated); + let naive_paths = naive.instance.facts_for_predicate("Path"); + let semi_paths = semi_naive.instance.facts_for_predicate("Path"); + assert_eq!(naive_paths.len(), semi_paths.len()); + // 4-node chain should yield 4 + 3 + 2 + 1 = 10 paths. + assert_eq!(semi_paths.len(), 10); + } + + #[test] + fn test_semi_naive_handles_existentials() { + let instance: Instance = vec![ + Atom::new("Person", vec![Term::constant("alice")]), + Atom::new("Person", vec![Term::constant("bob")]), + ] + .into_iter() + .collect(); + + let rule = RuleBuilder::new() + .when("Person", vec![Term::var("X")]) + .then("HasSSN", vec![Term::var("X"), Term::var("Y")]) + .build(); + + let result = chase_with_config( + instance, + &[rule], + ChaseConfig { + semi_naive: true, + ..Default::default() + }, + ); + + assert!(result.terminated); + let has_ssn = result.instance.facts_for_predicate("HasSSN"); + assert_eq!(has_ssn.len(), 2); + } + + // Skolem chase tests + + #[test] + fn test_skolem_chase_terminates_with_existentials() { + let instance: Instance = vec![Atom::new("Person", vec![Term::constant("alice")])] + .into_iter() + .collect(); + + let rule = RuleBuilder::new() + .when("Person", vec![Term::var("X")]) + .then("HasSSN", vec![Term::var("X"), Term::var("Y")]) + .build(); + + let result = skolem_chase(instance, &[rule]); + + assert!(result.terminated); + let has_ssn = result.instance.facts_for_predicate("HasSSN"); + assert_eq!(has_ssn.len(), 1); + assert!(matches!(has_ssn[0].terms[1], Term::Null(_))); + } + + #[test] + fn test_skolem_chase_reuses_null_for_same_frontier_binding() { + // Two TGDs with the same frontier should produce nulls that the + // chase recognizes as the same value when Skolem is used. + let instance: Instance = vec![Atom::new("Person", vec![Term::constant("alice")])] + .into_iter() + .collect(); + + let rule = RuleBuilder::new() + .when("Person", vec![Term::var("X")]) + .then("HasSSN", vec![Term::var("X"), Term::var("Y")]) + .build(); + + // Run twice on the same input: the Skolem null id should be stable + // within one chase run (re-application is idempotent). + let result1 = skolem_chase(instance.clone(), std::slice::from_ref(&rule)); + let result2 = skolem_chase(instance, &[rule]); + + let f1 = result1.instance.facts_for_predicate("HasSSN"); + let f2 = result2.instance.facts_for_predicate("HasSSN"); + assert_eq!(f1.len(), 1); + assert_eq!(f2.len(), 1); + // Same null id within a single run. + assert_eq!(f1[0].terms[1], f2[0].terms[1]); + } + + #[test] + fn test_skolem_chase_distinct_frontiers_get_distinct_nulls() { + let instance: Instance = vec![ + Atom::new("Person", vec![Term::constant("alice")]), + Atom::new("Person", vec![Term::constant("bob")]), + ] + .into_iter() + .collect(); + + let rule = RuleBuilder::new() + .when("Person", vec![Term::var("X")]) + .then("HasSSN", vec![Term::var("X"), Term::var("Y")]) + .build(); + + let result = skolem_chase(instance, &[rule]); + + assert!(result.terminated); + let has_ssn = result.instance.facts_for_predicate("HasSSN"); + assert_eq!(has_ssn.len(), 2); + let nulls: Vec<_> = has_ssn.iter().map(|f| &f.terms[1]).collect(); + assert_ne!(nulls[0], nulls[1]); + } + + #[test] + fn test_skolem_chase_matches_restricted_for_datalog() { + let instance: Instance = vec![ + Atom::new("Edge", vec![Term::constant("a"), Term::constant("b")]), + Atom::new("Edge", vec![Term::constant("b"), Term::constant("c")]), + ] + .into_iter() + .collect(); + + let rule1 = RuleBuilder::new() + .when("Edge", vec![Term::var("X"), Term::var("Y")]) + .then("Path", vec![Term::var("X"), Term::var("Y")]) + .build(); + + let rule2 = RuleBuilder::new() + .when("Path", vec![Term::var("X"), Term::var("Y")]) + .when("Edge", vec![Term::var("Y"), Term::var("Z")]) + .then("Path", vec![Term::var("X"), Term::var("Z")]) + .build(); + + let rules = vec![rule1, rule2]; + let skolem = skolem_chase(instance.clone(), &rules); + let restricted = chase(instance, &rules); + + assert!(skolem.terminated); + assert!(restricted.terminated); + assert_eq!( + skolem.instance.facts_for_predicate("Path").len(), + restricted.instance.facts_for_predicate("Path").len(), + ); + } + #[test] fn test_oblivious_chase_via_config() { let instance: Instance = vec![Atom::new("A", vec![Term::constant("x")])] diff --git a/src/chase/inference.rs b/src/chase/inference.rs index bdedac8..d366756 100644 --- a/src/chase/inference.rs +++ b/src/chase/inference.rs @@ -47,6 +47,67 @@ impl NullGenerator { } } +/// A deterministic null generator keyed on rule identity, the name of the +/// existential variable, and the frontier-variable bindings of the current +/// rule application. +/// +/// The Skolem chase uses this so that the same rule application with the same +/// frontier bindings always yields the same labeled null, giving natural +/// termination for rules whose re-application would otherwise invent fresh +/// nulls forever. +#[derive(Debug, Default)] +pub(crate) struct SkolemGenerator { + counter: usize, + cache: HashMap, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +struct SkolemKey { + rule_index: usize, + existential: String, + frontier_bindings: Vec<(String, Term)>, +} + +impl SkolemGenerator { + pub(crate) fn seeded_from(instance: &Instance, rules: &[Rule]) -> Self { + Self { + counter: next_null_id(instance, rules), + cache: HashMap::new(), + } + } + + pub(crate) fn skolem_for( + &mut self, + rule_index: usize, + rule: &Rule, + existential: &str, + subst: &Substitution, + ) -> Term { + let frontier = rule.frontier_variables(); + let mut bindings: Vec<_> = frontier + .into_iter() + .filter_map(|variable| subst.get(&variable).map(|term| (variable, term.clone()))) + .collect(); + bindings.sort_by(|left, right| left.0.cmp(&right.0)); + + let key = SkolemKey { + rule_index, + existential: existential.to_string(), + frontier_bindings: bindings, + }; + + if let Some(term) = self.cache.get(&key) { + return term.clone(); + } + + let id = self.counter; + self.counter += 1; + let term = Term::Null(id); + self.cache.insert(key, term.clone()); + term + } +} + #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub(crate) struct Trigger { rule_index: usize, @@ -159,6 +220,67 @@ pub fn find_matches(instance: &Instance, body: &[Atom]) -> Vec { results } +/// Compute body matches for a chase step, optionally using semi-naive +/// evaluation against a delta of facts added in the previous round. +/// +/// When `delta` is `None`, this matches the body against the full instance. +/// When `delta` is `Some`, the result is the union over each body position `i` +/// of matches that bind position `i` against `delta` and the remaining +/// positions against the full instance. Body matches that do not involve any +/// fact from `delta` are skipped, since they would have been produced in an +/// earlier round. +pub(crate) fn find_matches_for_step( + instance: &Instance, + delta: Option<&Instance>, + body: &[Atom], +) -> Vec { + let Some(delta) = delta else { + return find_matches(instance, body); + }; + + if body.is_empty() { + // Rules with empty bodies fire once at round zero and have no + // body atom to anchor against `delta` afterwards. + return Vec::new(); + } + + let mut all = Vec::new(); + for delta_index in 0..body.len() { + let mut results = vec![Substitution::new()]; + + for (position, body_atom) in body.iter().enumerate() { + let target = if position == delta_index { + delta + } else { + instance + }; + + let mut new_results = Vec::new(); + for subst in &results { + let pattern = subst.apply_atom(body_atom); + for fact in target.facts_matching_pattern(&pattern) { + if let Some(next_subst) = unify_atom(&pattern, fact) { + let mut combined = subst.clone(); + for (var, term) in next_subst.iter() { + combined.bind(var.clone(), term.clone()); + } + new_results.push(combined); + } + } + } + + results = new_results; + if results.is_empty() { + break; + } + } + + all.extend(results); + } + + all +} + impl MaterializedState { pub fn provenance_for(&self, atom: &Atom) -> Option<&Derivation> { self.provenance.get(atom) @@ -194,6 +316,29 @@ pub(crate) fn apply_rule_head( .collect() } +/// Like [`apply_rule_head`], but binds existential variables to deterministic +/// Skolem nulls based on `(rule_index, existential_name, frontier_bindings)`. +pub(crate) fn apply_rule_head_skolem( + rule_index: usize, + rule: &Rule, + subst: &Substitution, + skolem_gen: &mut SkolemGenerator, +) -> Vec { + let mut extended_subst = subst.clone(); + let mut existentials = rule.existential_variables().into_iter().collect::>(); + existentials.sort(); + + for variable in existentials { + let term = skolem_gen.skolem_for(rule_index, rule, &variable, subst); + extended_subst.bind(variable, term); + } + + rule.head + .iter() + .map(|atom| extended_subst.apply_atom(atom)) + .collect() +} + pub(crate) fn next_null_id(instance: &Instance, rules: &[Rule]) -> usize { let instance_max = instance .iter() diff --git a/src/chase/mod.rs b/src/chase/mod.rs index 48ad544..3b21b07 100644 --- a/src/chase/mod.rs +++ b/src/chase/mod.rs @@ -13,7 +13,7 @@ mod engine; pub use atom::Atom; pub use engine::{ ChaseConfig, ChaseError, ChaseResult, ChaseVariant, chase, chase_full, chase_with_config, - chase_with_egds, oblivious_chase, standard_chase, + chase_with_egds, oblivious_chase, skolem_chase, standard_chase, }; pub use inference::{Derivation, MaterializedState, find_matches, materialize}; pub use instance::{Instance, InstanceError}; diff --git a/src/execution/mod.rs b/src/execution/mod.rs index 6b83f74..5f1cd7c 100644 --- a/src/execution/mod.rs +++ b/src/execution/mod.rs @@ -4,6 +4,7 @@ //! provides table scans. The built-in [`Instance`](crate::chase::Instance) //! adapter and the [`TableStore`] are the two provided implementations. +pub mod physical; pub mod table_store; use std::cmp::Ordering; @@ -14,6 +15,9 @@ use crate::chase::{Instance, Term}; use crate::planner::logical::{LogicalExpr, LogicalPlan, SortDirection, SortKey}; use crate::relational::{ResultSet, Row, Schema, Value}; +pub use physical::{ + NamedPhysicalExpr, PhysicalPlan, execute_physical, plan_physical, rewrite_physical, +}; pub use table_store::TableStore; /// Errors returned by the current logical-plan executor. diff --git a/src/execution/physical.rs b/src/execution/physical.rs new file mode 100644 index 0000000..0e467c8 --- /dev/null +++ b/src/execution/physical.rs @@ -0,0 +1,470 @@ +//! Physical operator scaffolding. +//! +//! Logical plans describe *what* the query computes; physical plans describe +//! *how* it is executed. Today the physical layer mirrors the logical layer +//! one-to-one. The split exists so future work can add operator strategies +//! (for example, a hash join physical operator alongside the current +//! nested-loop join) without changing logical semantics. +//! +//! The current layer is intentionally narrow: +//! +//! - [`PhysicalPlan`] mirrors [`LogicalPlan`] with execution-oriented names. +//! - [`plan_physical`] converts a logical plan into a physical plan. +//! - [`rewrite_physical`] applies a small set of rule-based rewrites. +//! - [`execute_physical`] runs the physical plan against a [`DataSource`]. + +use std::cmp::Ordering; + +use crate::planner::logical::{LogicalExpr, LogicalPlan, SortDirection, SortKey}; +use crate::relational::{ResultSet, Row, Schema, Value}; + +use super::{DataSource, ExecutionError}; + +/// A physical plan node in the current execution subset. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum PhysicalPlan { + /// Sequentially scan all rows of a table from the data source. + SeqScan { table: String, schema: Schema }, + /// Form the Cartesian product of two inputs by iterating the right + /// input for each row of the left input. + NestedLoopJoin { + left: Box, + right: Box, + schema: Schema, + }, + /// Filter rows by a predicate expression. + Filter { + input: Box, + predicate: LogicalExpr, + }, + /// Sort rows by one or more output columns. + Sort { + input: Box, + keys: Vec, + schema: Schema, + }, + /// Project a new output schema by evaluating expressions per row. + Project { + input: Box, + expressions: Vec, + schema: Schema, + }, + /// Limit the number of output rows. + Limit { + input: Box, + count: usize, + }, +} + +/// A named physical expression in a projection. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct NamedPhysicalExpr { + /// Output column name. + pub name: String, + /// Expression to evaluate. + pub expr: LogicalExpr, +} + +impl PhysicalPlan { + /// Return the schema produced by this physical plan. + pub fn output_schema(&self) -> &Schema { + match self { + Self::SeqScan { schema, .. } => schema, + Self::NestedLoopJoin { schema, .. } => schema, + Self::Filter { input, .. } => input.output_schema(), + Self::Sort { schema, .. } => schema, + Self::Project { schema, .. } => schema, + Self::Limit { input, .. } => input.output_schema(), + } + } +} + +/// Translate a [`LogicalPlan`] into a [`PhysicalPlan`]. +/// +/// This is currently a one-to-one mapping. Logical `Scan` becomes physical +/// `SeqScan`, logical `CrossJoin` becomes physical `NestedLoopJoin`, and +/// other operators keep their names. Future strategy choices belong here. +pub fn plan_physical(plan: &LogicalPlan) -> PhysicalPlan { + match plan { + LogicalPlan::Scan { table, schema } => PhysicalPlan::SeqScan { + table: table.clone(), + schema: schema.clone(), + }, + LogicalPlan::CrossJoin { + left, + right, + schema, + } => PhysicalPlan::NestedLoopJoin { + left: Box::new(plan_physical(left)), + right: Box::new(plan_physical(right)), + schema: schema.clone(), + }, + LogicalPlan::Filter { input, predicate } => PhysicalPlan::Filter { + input: Box::new(plan_physical(input)), + predicate: predicate.clone(), + }, + LogicalPlan::Sort { + input, + keys, + schema, + } => PhysicalPlan::Sort { + input: Box::new(plan_physical(input)), + keys: keys.clone(), + schema: schema.clone(), + }, + LogicalPlan::Project { + input, + expressions, + schema, + } => PhysicalPlan::Project { + input: Box::new(plan_physical(input)), + expressions: expressions + .iter() + .map(|named| NamedPhysicalExpr { + name: named.name.clone(), + expr: named.expr.clone(), + }) + .collect(), + schema: schema.clone(), + }, + LogicalPlan::Limit { input, count } => PhysicalPlan::Limit { + input: Box::new(plan_physical(input)), + count: *count, + }, + } +} + +/// Apply rule-based rewrites to a physical plan. +/// +/// Today the only rewrite is `combine_adjacent_limits`, which collapses +/// `Limit(Limit(child, n), m)` into `Limit(child, min(n, m))`. Future +/// rewrites belong here as additional functions composed in this entry +/// point. +pub fn rewrite_physical(plan: PhysicalPlan) -> PhysicalPlan { + combine_adjacent_limits(plan) +} + +fn combine_adjacent_limits(plan: PhysicalPlan) -> PhysicalPlan { + match plan { + PhysicalPlan::Limit { input, count } => { + let inner = combine_adjacent_limits(*input); + match inner { + PhysicalPlan::Limit { + input: child, + count: inner_count, + } => PhysicalPlan::Limit { + input: child, + count: count.min(inner_count), + }, + other => PhysicalPlan::Limit { + input: Box::new(other), + count, + }, + } + } + PhysicalPlan::NestedLoopJoin { + left, + right, + schema, + } => PhysicalPlan::NestedLoopJoin { + left: Box::new(combine_adjacent_limits(*left)), + right: Box::new(combine_adjacent_limits(*right)), + schema, + }, + PhysicalPlan::Filter { input, predicate } => PhysicalPlan::Filter { + input: Box::new(combine_adjacent_limits(*input)), + predicate, + }, + PhysicalPlan::Sort { + input, + keys, + schema, + } => PhysicalPlan::Sort { + input: Box::new(combine_adjacent_limits(*input)), + keys, + schema, + }, + PhysicalPlan::Project { + input, + expressions, + schema, + } => PhysicalPlan::Project { + input: Box::new(combine_adjacent_limits(*input)), + expressions, + schema, + }, + leaf @ PhysicalPlan::SeqScan { .. } => leaf, + } +} + +/// Execute a physical plan against the provided data source. +pub fn execute_physical( + plan: &PhysicalPlan, + source: &dyn DataSource, +) -> Result { + match plan { + PhysicalPlan::SeqScan { table, schema } => source.scan(table, schema), + PhysicalPlan::NestedLoopJoin { + left, + right, + schema, + } => { + let left_result = execute_physical(left, source)?; + let right_result = execute_physical(right, source)?; + let mut rows = Vec::new(); + + for left_row in left_result.rows() { + for right_row in right_result.rows() { + let mut values = left_row.values().to_vec(); + values.extend_from_slice(right_row.values()); + rows.push(Row::new(values)); + } + } + + Ok(ResultSet::new(schema.clone(), rows)) + } + PhysicalPlan::Filter { input, predicate } => { + let result = execute_physical(input, source)?; + let filtered_rows = result + .rows() + .iter() + .filter(|row| eval_predicate(predicate, row, result.schema()).unwrap_or(false)) + .cloned() + .collect(); + Ok(ResultSet::new(result.schema().clone(), filtered_rows)) + } + PhysicalPlan::Project { + input, + expressions, + schema, + } => { + let result = execute_physical(input, source)?; + let mut rows = Vec::new(); + for row in result.rows() { + let values = expressions + .iter() + .map(|named| eval_expr(&named.expr, row, result.schema())) + .collect::, _>>()?; + rows.push(Row::new(values)); + } + Ok(ResultSet::new(schema.clone(), rows)) + } + PhysicalPlan::Sort { + input, + keys, + schema, + } => { + let result = execute_physical(input, source)?; + let mut rows = result.rows().to_vec(); + let resolved = resolve_sort_keys(keys, result.schema())?; + rows.sort_by(|left, right| compare_rows(left, right, &resolved)); + Ok(ResultSet::new(schema.clone(), rows)) + } + PhysicalPlan::Limit { input, count } => { + let result = execute_physical(input, source)?; + let rows = result.rows().iter().take(*count).cloned().collect(); + Ok(ResultSet::new(result.schema().clone(), rows)) + } + } +} + +fn eval_predicate(expr: &LogicalExpr, row: &Row, schema: &Schema) -> Result { + match expr { + LogicalExpr::Eq(left, right) => Ok(eval_expr(left, row, schema)? + .sql_eq(&eval_expr(right, row, schema)?) + .unwrap_or(false)), + LogicalExpr::Ne(left, right) => Ok(eval_expr(left, row, schema)? + .sql_eq(&eval_expr(right, row, schema)?) + .map(|eq| !eq) + .unwrap_or(false)), + LogicalExpr::And(left, right) => { + Ok(eval_predicate(left, row, schema)? && eval_predicate(right, row, schema)?) + } + LogicalExpr::Or(left, right) => { + Ok(eval_predicate(left, row, schema)? || eval_predicate(right, row, schema)?) + } + _ => Ok(false), + } +} + +fn eval_expr(expr: &LogicalExpr, row: &Row, schema: &Schema) -> Result { + match expr { + LogicalExpr::Column(name) => { + let index = schema + .index_of(name) + .ok_or_else(|| ExecutionError::UnknownColumn(name.clone()))?; + Ok(row.get(index).cloned().unwrap_or(Value::Null)) + } + LogicalExpr::Literal(value) => Ok(value.clone()), + LogicalExpr::Eq(left, right) => { + let left = eval_expr(left, row, schema)?; + let right = eval_expr(right, row, schema)?; + Ok(Value::Boolean(left.sql_eq(&right).unwrap_or(false))) + } + LogicalExpr::Ne(left, right) => { + let left = eval_expr(left, row, schema)?; + let right = eval_expr(right, row, schema)?; + Ok(Value::Boolean( + left.sql_eq(&right).map(|eq| !eq).unwrap_or(false), + )) + } + LogicalExpr::And(left, right) => Ok(Value::Boolean( + eval_predicate(left, row, schema)? && eval_predicate(right, row, schema)?, + )), + LogicalExpr::Or(left, right) => Ok(Value::Boolean( + eval_predicate(left, row, schema)? || eval_predicate(right, row, schema)?, + )), + } +} + +fn resolve_sort_keys( + keys: &[SortKey], + schema: &Schema, +) -> Result, ExecutionError> { + keys.iter() + .map(|key| { + let index = schema + .index_of(&key.column) + .ok_or_else(|| ExecutionError::UnknownColumn(key.column.clone()))?; + Ok((index, key.direction)) + }) + .collect() +} + +fn compare_rows(left: &Row, right: &Row, keys: &[(usize, SortDirection)]) -> Ordering { + for (index, direction) in keys { + let left_value = left.get(*index).unwrap_or(&Value::Null); + let right_value = right.get(*index).unwrap_or(&Value::Null); + let ordering = compare_values(left_value, right_value); + if ordering != Ordering::Equal { + return match direction { + SortDirection::Asc => ordering, + SortDirection::Desc => ordering.reverse(), + }; + } + } + + Ordering::Equal +} + +fn compare_values(left: &Value, right: &Value) -> Ordering { + match (left, right) { + (Value::Null, Value::Null) => Ordering::Equal, + (Value::Null, _) => Ordering::Greater, + (_, Value::Null) => Ordering::Less, + (Value::Text(left), Value::Text(right)) => left.cmp(right), + (Value::Integer(left), Value::Integer(right)) => left.cmp(right), + (Value::Boolean(left), Value::Boolean(right)) => left.cmp(right), + (Value::Integer(_), _) => Ordering::Less, + (_, Value::Integer(_)) => Ordering::Greater, + (Value::Text(_), Value::Boolean(_)) => Ordering::Less, + (Value::Boolean(_), Value::Text(_)) => Ordering::Greater, + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::chase::{Atom, Instance, Term}; + use crate::planner::logical::{LogicalPlan, NamedExpr}; + use crate::relational::{DataType, Field}; + + fn parent_instance() -> Instance { + vec![ + Atom::new( + "Parent", + vec![Term::constant("alice"), Term::constant("bob")], + ), + Atom::new( + "Parent", + vec![Term::constant("bob"), Term::constant("carol")], + ), + ] + .into_iter() + .collect() + } + + fn parent_schema() -> Schema { + Schema::new(vec![ + Field::new("c0", DataType::Text, false), + Field::new("c1", DataType::Text, false), + ]) + } + + #[test] + fn plan_physical_mirrors_logical_scan() { + let logical = LogicalPlan::Scan { + table: "Parent".to_string(), + schema: parent_schema(), + }; + let physical = plan_physical(&logical); + assert!(matches!(physical, PhysicalPlan::SeqScan { .. })); + } + + #[test] + fn execute_physical_runs_scan_and_limit() { + let schema = parent_schema(); + let logical = LogicalPlan::Limit { + input: Box::new(LogicalPlan::Scan { + table: "Parent".to_string(), + schema: schema.clone(), + }), + count: 1, + }; + + let physical = plan_physical(&logical); + let result = execute_physical(&physical, &parent_instance()).unwrap(); + assert_eq!(result.rows().len(), 1); + } + + #[test] + fn rewrite_collapses_adjacent_limits() { + let schema = parent_schema(); + let nested = PhysicalPlan::Limit { + input: Box::new(PhysicalPlan::Limit { + input: Box::new(PhysicalPlan::SeqScan { + table: "Parent".to_string(), + schema, + }), + count: 5, + }), + count: 2, + }; + + let rewritten = rewrite_physical(nested); + match rewritten { + PhysicalPlan::Limit { input, count } => { + assert_eq!(count, 2); + assert!(matches!(*input, PhysicalPlan::SeqScan { .. })); + } + other => panic!("expected single Limit, got {:?}", other), + } + } + + #[test] + fn execute_physical_handles_projection_and_filter() { + let schema = parent_schema(); + let logical = LogicalPlan::Project { + input: Box::new(LogicalPlan::Filter { + input: Box::new(LogicalPlan::Scan { + table: "Parent".to_string(), + schema: schema.clone(), + }), + predicate: LogicalExpr::Eq( + Box::new(LogicalExpr::Column("c1".to_string())), + Box::new(LogicalExpr::Literal(Value::text("bob"))), + ), + }), + expressions: vec![NamedExpr { + name: "c0".to_string(), + expr: LogicalExpr::Column("c0".to_string()), + }], + schema: Schema::new(vec![Field::new("c0", DataType::Text, false)]), + }; + + let physical = rewrite_physical(plan_physical(&logical)); + let result = execute_physical(&physical, &parent_instance()).unwrap(); + assert_eq!(result.rows().len(), 1); + assert_eq!(result.rows()[0].values()[0], Value::text("alice")); + } +} diff --git a/src/lib.rs b/src/lib.rs index 56096f8..086587b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -18,5 +18,5 @@ pub mod sql; // Lower-level reasoning and provenance APIs remain under `query_engine::chase`. pub use chase::{ Atom, ChaseConfig, ChaseError, ChaseResult, ChaseVariant, Instance, Rule, RuleBuilder, Term, - chase, chase_with_config, oblivious_chase, standard_chase, + chase, chase_with_config, oblivious_chase, skolem_chase, standard_chase, };