Add Skolem chase and semi-naive evaluation support

This commit is contained in:
Hassan Abedi 2026-04-13 11:01:30 +02:00
parent f0d22976c7
commit 57a6eaaef6
9 changed files with 937 additions and 24 deletions

View File

@ -59,7 +59,7 @@ Quick examples:
- `src/catalog/`: predicate-to-table schema inference and catalog access. - `src/catalog/`: predicate-to-table schema inference and catalog access.
- `src/sql/`: narrow SQL AST and parser support. - `src/sql/`: narrow SQL AST and parser support.
- `src/planner/`: logical plan structures and SQL-to-plan translation. - `src/planner/`: logical plan structures and SQL-to-plan translation.
- `src/execution/`: execution of the current logical plan subset, including the `DataSource` trait and the `TableStore` in-memory source. - `src/execution/`: execution of the current logical plan subset, including the `DataSource` trait, the `TableStore` in-memory source, and the physical operator layer in `physical.rs` with rule-based rewrites.
- `examples/scripts/`: runnable script examples for supported workflows. - `examples/scripts/`: runnable script examples for supported workflows.
- `tests/`: integration, regression, and property-based tests. - `tests/`: integration, regression, and property-based tests.

View File

@ -10,10 +10,12 @@ execution boundaries.
### Current scope ### Current scope
- Chase-based rule evaluation over facts, rules, and substitutions - Chase-based rule evaluation over facts, rules, and substitutions
- Restricted-chase style materialization with active-trigger checks - Restricted, standard, oblivious, and Skolem chase variants
- Optional semi-naive evaluation across all chase variants
- Provenance-oriented explanations for derived answers - Provenance-oriented explanations for derived answers
- Script, REPL, and local web UI for experimentation - Script, REPL, and local web UI for experimentation
- Relational schema, catalog, logical-plan, and execution scaffolding - Relational schema, catalog, logical-plan, and execution scaffolding
- Physical operator scaffolding with a small rule-based rewrite layer
- A minimal SQL slice for `SELECT-FROM-WHERE-ORDER BY-LIMIT` queries over predicate-backed tables - A minimal SQL slice for `SELECT-FROM-WHERE-ORDER BY-LIMIT` queries over predicate-backed tables
### Architecture ### Architecture
@ -26,7 +28,7 @@ The repository is currently organized around a few clear subsystems:
- `src/catalog/`: predicate-backed table metadata - `src/catalog/`: predicate-backed table metadata
- `src/sql/`: minimal SQL AST and parser - `src/sql/`: minimal SQL AST and parser
- `src/planner/`: logical plan structures and SQL-to-plan translation - `src/planner/`: logical plan structures and SQL-to-plan translation
- `src/execution/`: execution for the current logical-plan subset, `DataSource` trait, and `TableStore` - `src/execution/`: execution for the current logical-plan subset, the `DataSource` trait, the `TableStore`, and a physical operator layer with rule-based rewrites
Today, the chase subsystem is still the most mature part of the codebase. The Today, the chase subsystem is still the most mature part of the codebase. The
relational and SQL modules are present to create clean extension points for a relational and SQL modules are present to create clean extension points for a

View File

@ -65,9 +65,9 @@ This document tracks the current state and next steps for the repository.
### Execution and Optimization ### Execution and Optimization
- [ ] Introduce physical operator abstractions - [x] Introduce physical operator abstractions
- [x] Add a planning step from logical operators to executable operators - [x] Add a planning step from logical operators to executable operators
- [ ] Add basic rule-based logical rewrites - [x] Add basic rule-based logical rewrites
- [ ] Add statistics and cost-model scaffolding - [ ] Add statistics and cost-model scaffolding
- [ ] Add indexing and access-path abstractions - [ ] Add indexing and access-path abstractions
@ -76,13 +76,13 @@ This document tracks the current state and next steps for the repository.
- [x] Restricted chase - [x] Restricted chase
- [x] Standard chase - [x] Standard chase
- [x] Oblivious chase - [x] Oblivious chase
- [ ] Skolem chase - [x] Skolem chase
- [ ] Core chase - [ ] Core chase
- [ ] Negative constraints - [ ] Negative constraints
- [ ] Stratified negation in rule bodies - [ ] Stratified negation in rule bodies
- [ ] Disjunctive heads - [ ] Disjunctive heads
- [ ] Aggregation support in rule evaluation - [ ] Aggregation support in rule evaluation
- [ ] Semi-naive evaluation - [x] Semi-naive evaluation
- [ ] Termination analysis helpers - [ ] Termination analysis helpers
### Data and Interoperability ### Data and Interoperability
@ -95,7 +95,7 @@ This document tracks the current state and next steps for the repository.
### Performance and Reliability ### Performance and Reliability
- [ ] Predicate indexing for fact lookup - [x] Predicate indexing for fact lookup
- [ ] Incremental evaluation - [ ] Incremental evaluation
- [ ] Benchmarks - [ ] Benchmarks
- [ ] Fuzzing - [ ] Fuzzing

View File

@ -5,7 +5,10 @@ use std::error::Error;
use std::fmt; use std::fmt;
use super::atom::Atom; use super::atom::Atom;
use super::inference::{NullGenerator, Trigger, apply_rule_head, find_matches, head_is_satisfied}; use super::inference::{
NullGenerator, SkolemGenerator, Trigger, apply_rule_head, apply_rule_head_skolem, find_matches,
find_matches_for_step, head_is_satisfied,
};
use super::instance::Instance; use super::instance::Instance;
use super::rule::{Egd, Rule}; use super::rule::{Egd, Rule};
use super::substitution::Substitution; use super::substitution::Substitution;
@ -70,6 +73,13 @@ pub enum ChaseVariant {
/// variables this variant will typically not terminate (it will hit the /// variables this variant will typically not terminate (it will hit the
/// step limit) because each application generates fresh nulls. /// step limit) because each application generates fresh nulls.
Oblivious, Oblivious,
/// Skolem chase: fires every matching rule application, like oblivious,
/// but binds each existential variable to a deterministic (skolem) null
/// keyed on the rule identity, the existential variable, and the
/// frontier-variable bindings. Re-application with the same frontier
/// bindings produces the same head atom, so the chase terminates for many
/// schemas where the oblivious variant does not.
Skolem,
} }
/// Configuration for the chase algorithm. /// Configuration for the chase algorithm.
@ -79,6 +89,11 @@ pub struct ChaseConfig {
pub max_steps: usize, pub max_steps: usize,
/// The chase variant to use. /// The chase variant to use.
pub variant: ChaseVariant, pub variant: ChaseVariant,
/// When true, the chase uses semi-naive evaluation: each round only
/// considers rule applications that involve at least one fact added in
/// the previous round, instead of re-matching the entire instance. This
/// is a pure performance switch and does not change the chase result.
pub semi_naive: bool,
} }
impl Default for ChaseConfig { impl Default for ChaseConfig {
@ -86,6 +101,7 @@ impl Default for ChaseConfig {
ChaseConfig { ChaseConfig {
max_steps: 10_000, max_steps: 10_000,
variant: ChaseVariant::default(), variant: ChaseVariant::default(),
semi_naive: false,
} }
} }
} }
@ -138,6 +154,22 @@ pub fn oblivious_chase(instance: Instance, rules: &[Rule]) -> ChaseResult {
chase_with_config(instance, rules, config) chase_with_config(instance, rules, config)
} }
/// Run the Skolem chase algorithm.
///
/// The Skolem chase fires every matching rule application, like the
/// oblivious variant, but binds each existential variable to a deterministic
/// null keyed on the rule, the variable, and the frontier-variable bindings.
/// Re-application with the same frontier bindings reuses the same null, so
/// the chase terminates whenever the set of derivable facts is finite, even
/// in the presence of existentials.
pub fn skolem_chase(instance: Instance, rules: &[Rule]) -> ChaseResult {
let config = ChaseConfig {
variant: ChaseVariant::Skolem,
..Default::default()
};
chase_with_config(instance, rules, config)
}
/// Run the chase with custom configuration. /// Run the chase with custom configuration.
pub fn chase_with_config( pub fn chase_with_config(
mut instance: Instance, mut instance: Instance,
@ -145,8 +177,16 @@ pub fn chase_with_config(
config: ChaseConfig, config: ChaseConfig,
) -> ChaseResult { ) -> ChaseResult {
let mut null_gen = NullGenerator::seeded_from(&instance, rules); let mut null_gen = NullGenerator::seeded_from(&instance, rules);
let mut skolem_gen = SkolemGenerator::seeded_from(&instance, rules);
let mut applied_triggers: HashSet<Trigger> = HashSet::new(); let mut applied_triggers: HashSet<Trigger> = HashSet::new();
let mut steps = 0; let mut steps = 0;
// For semi-naive evaluation: at round zero every fact is "new", so the
// delta starts as the full input instance.
let mut delta_owned: Option<Instance> = if config.semi_naive {
Some(instance.clone())
} else {
None
};
loop { loop {
if steps >= config.max_steps { if steps >= config.max_steps {
@ -158,12 +198,18 @@ pub fn chase_with_config(
}; };
} }
let delta = delta_owned.as_ref();
let new_facts = match config.variant { let new_facts = match config.variant {
ChaseVariant::Standard => standard_chase_step(&instance, rules, &mut null_gen), ChaseVariant::Standard => standard_chase_step(&instance, delta, rules, &mut null_gen),
ChaseVariant::Restricted => { ChaseVariant::Restricted => restricted_chase_step(
restricted_chase_step(&instance, rules, &mut null_gen, &mut applied_triggers) &instance,
} delta,
ChaseVariant::Oblivious => oblivious_chase_step(&instance, rules, &mut null_gen), rules,
&mut null_gen,
&mut applied_triggers,
),
ChaseVariant::Oblivious => oblivious_chase_step(&instance, delta, rules, &mut null_gen),
ChaseVariant::Skolem => skolem_chase_step(&instance, delta, rules, &mut skolem_gen),
}; };
if new_facts.is_empty() { if new_facts.is_empty() {
@ -176,9 +222,18 @@ pub fn chase_with_config(
}; };
} }
let mut next_delta = if config.semi_naive {
Some(Instance::new())
} else {
None
};
for fact in new_facts { for fact in new_facts {
instance.add(fact); let added = instance.add(fact.clone());
if added && let Some(next) = next_delta.as_mut() {
next.add(fact);
}
} }
delta_owned = next_delta;
steps += 1; steps += 1;
} }
} }
@ -186,6 +241,7 @@ pub fn chase_with_config(
/// Perform a single standard chase step: apply rules without trigger tracking. /// Perform a single standard chase step: apply rules without trigger tracking.
fn standard_chase_step( fn standard_chase_step(
instance: &Instance, instance: &Instance,
delta: Option<&Instance>,
rules: &[Rule], rules: &[Rule],
null_gen: &mut NullGenerator, null_gen: &mut NullGenerator,
) -> Vec<Atom> { ) -> Vec<Atom> {
@ -193,7 +249,7 @@ fn standard_chase_step(
for rule in rules { for rule in rules {
// Find all ways to match the rule body against the instance // Find all ways to match the rule body against the instance
let matches = find_matches(instance, &rule.body); let matches = find_matches_for_step(instance, delta, &rule.body);
for subst in matches { for subst in matches {
// In standard chase, we only check if head is satisfied // In standard chase, we only check if head is satisfied
@ -218,6 +274,7 @@ fn standard_chase_step(
/// Perform a single restricted chase step: use trigger tracking to avoid redundant applications. /// Perform a single restricted chase step: use trigger tracking to avoid redundant applications.
fn restricted_chase_step( fn restricted_chase_step(
instance: &Instance, instance: &Instance,
delta: Option<&Instance>,
rules: &[Rule], rules: &[Rule],
null_gen: &mut NullGenerator, null_gen: &mut NullGenerator,
applied_triggers: &mut HashSet<Trigger>, applied_triggers: &mut HashSet<Trigger>,
@ -226,7 +283,7 @@ fn restricted_chase_step(
for (rule_idx, rule) in rules.iter().enumerate() { for (rule_idx, rule) in rules.iter().enumerate() {
// Find all ways to match the rule body against the instance // Find all ways to match the rule body against the instance
let matches = find_matches(instance, &rule.body); let matches = find_matches_for_step(instance, delta, &rule.body);
for subst in matches { for subst in matches {
// Create a trigger to check if we've already applied this // Create a trigger to check if we've already applied this
@ -262,13 +319,14 @@ fn restricted_chase_step(
/// without checking head satisfaction or tracking triggers. /// without checking head satisfaction or tracking triggers.
fn oblivious_chase_step( fn oblivious_chase_step(
instance: &Instance, instance: &Instance,
delta: Option<&Instance>,
rules: &[Rule], rules: &[Rule],
null_gen: &mut NullGenerator, null_gen: &mut NullGenerator,
) -> Vec<Atom> { ) -> Vec<Atom> {
let mut new_facts = Vec::new(); let mut new_facts = Vec::new();
for rule in rules { for rule in rules {
let matches = find_matches(instance, &rule.body); let matches = find_matches_for_step(instance, delta, &rule.body);
for subst in matches { for subst in matches {
let derived = apply_rule_head(rule, &subst, null_gen); let derived = apply_rule_head(rule, &subst, null_gen);
@ -284,6 +342,36 @@ fn oblivious_chase_step(
new_facts new_facts
} }
/// Perform a single Skolem chase step: fire all matching rule applications
/// using deterministic skolem nulls for existential variables. Natural
/// termination comes from the fact that a rule re-applied with the same
/// frontier bindings produces the same head atom, which is already in the
/// instance.
fn skolem_chase_step(
instance: &Instance,
delta: Option<&Instance>,
rules: &[Rule],
skolem_gen: &mut SkolemGenerator,
) -> Vec<Atom> {
let mut new_facts = Vec::new();
for (rule_index, rule) in rules.iter().enumerate() {
let matches = find_matches_for_step(instance, delta, &rule.body);
for subst in matches {
let derived = apply_rule_head_skolem(rule_index, rule, &subst, skolem_gen);
for fact in derived {
if !instance.contains(&fact) {
new_facts.push(fact);
}
}
}
}
new_facts
}
/// A trigger for EGD applications, tracking which EGD was applied with which body bindings. /// A trigger for EGD applications, tracking which EGD was applied with which body bindings.
#[derive(Debug, Clone, PartialEq, Eq, Hash)] #[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct EgdTrigger { struct EgdTrigger {
@ -361,10 +449,19 @@ pub fn chase_full(
config: ChaseConfig, config: ChaseConfig,
) -> ChaseResult { ) -> ChaseResult {
let mut null_gen = NullGenerator::seeded_from(&instance, tgds); let mut null_gen = NullGenerator::seeded_from(&instance, tgds);
let mut skolem_gen = SkolemGenerator::seeded_from(&instance, tgds);
let mut applied_triggers: HashSet<Trigger> = HashSet::new(); let mut applied_triggers: HashSet<Trigger> = HashSet::new();
let mut applied_egd_triggers: HashSet<EgdTrigger> = HashSet::new(); let mut applied_egd_triggers: HashSet<EgdTrigger> = HashSet::new();
let mut uf = UnionFind::new(); let mut uf = UnionFind::new();
let mut steps = 0; let mut steps = 0;
// Semi-naive delta: starts as the full instance so the first round
// matches against everything and is empty after canonicalization
// changes (which can rewrite the whole instance).
let mut delta_owned: Option<Instance> = if config.semi_naive {
Some(instance.clone())
} else {
None
};
loop { loop {
if steps >= config.max_steps { if steps >= config.max_steps {
@ -377,17 +474,27 @@ pub fn chase_full(
} }
// Apply TGDs // Apply TGDs
let delta = delta_owned.as_ref();
let new_facts = match config.variant { let new_facts = match config.variant {
ChaseVariant::Standard => standard_chase_step(&instance, tgds, &mut null_gen), ChaseVariant::Standard => standard_chase_step(&instance, delta, tgds, &mut null_gen),
ChaseVariant::Restricted => { ChaseVariant::Restricted => {
restricted_chase_step(&instance, tgds, &mut null_gen, &mut applied_triggers) restricted_chase_step(&instance, delta, tgds, &mut null_gen, &mut applied_triggers)
} }
ChaseVariant::Oblivious => oblivious_chase_step(&instance, tgds, &mut null_gen), ChaseVariant::Oblivious => oblivious_chase_step(&instance, delta, tgds, &mut null_gen),
ChaseVariant::Skolem => skolem_chase_step(&instance, delta, tgds, &mut skolem_gen),
}; };
let tgd_changes = !new_facts.is_empty(); let tgd_changes = !new_facts.is_empty();
let mut next_delta = if config.semi_naive {
Some(Instance::new())
} else {
None
};
for fact in new_facts { for fact in new_facts {
instance.add(fact); let added = instance.add(fact.clone());
if added && let Some(next) = next_delta.as_mut() {
next.add(fact);
}
} }
// Apply EGDs // Apply EGDs
@ -406,6 +513,13 @@ pub fn chase_full(
// Canonicalize instance if EGDs made changes // Canonicalize instance if EGDs made changes
if egd_changes { if egd_changes {
instance = instance.canonicalize(&mut uf); instance = instance.canonicalize(&mut uf);
// Canonicalization can rewrite any atom in the instance,
// so the previously computed delta is no longer a sound
// approximation of "new" facts. Reset to the full
// instance for the next round.
if config.semi_naive {
next_delta = Some(instance.clone());
}
} }
// Check for fixpoint // Check for fixpoint
@ -420,6 +534,7 @@ pub fn chase_full(
} }
} }
delta_owned = next_delta;
steps += 1; steps += 1;
} }
} }
@ -842,6 +957,7 @@ mod tests {
let config = ChaseConfig { let config = ChaseConfig {
max_steps: 100, max_steps: 100,
variant: ChaseVariant::Standard, variant: ChaseVariant::Standard,
semi_naive: false,
}; };
let result = chase_full(instance, &[tgd], &[], config); let result = chase_full(instance, &[tgd], &[], config);
@ -934,6 +1050,7 @@ mod tests {
let config = ChaseConfig { let config = ChaseConfig {
max_steps: 10, max_steps: 10,
variant: ChaseVariant::Oblivious, variant: ChaseVariant::Oblivious,
semi_naive: false,
}; };
let result = chase_with_config(instance, &[rule], config); let result = chase_with_config(instance, &[rule], config);
@ -943,6 +1060,181 @@ mod tests {
assert!(result.instance.facts_for_predicate("HasSSN").len() > 1); assert!(result.instance.facts_for_predicate("HasSSN").len() > 1);
} }
// Semi-naive evaluation tests
#[test]
fn test_semi_naive_matches_naive_for_transitive_closure() {
let instance: Instance = vec![
Atom::new("Edge", vec![Term::constant("a"), Term::constant("b")]),
Atom::new("Edge", vec![Term::constant("b"), Term::constant("c")]),
Atom::new("Edge", vec![Term::constant("c"), Term::constant("d")]),
Atom::new("Edge", vec![Term::constant("d"), Term::constant("e")]),
]
.into_iter()
.collect();
let rule1 = RuleBuilder::new()
.when("Edge", vec![Term::var("X"), Term::var("Y")])
.then("Path", vec![Term::var("X"), Term::var("Y")])
.build();
let rule2 = RuleBuilder::new()
.when("Path", vec![Term::var("X"), Term::var("Y")])
.when("Edge", vec![Term::var("Y"), Term::var("Z")])
.then("Path", vec![Term::var("X"), Term::var("Z")])
.build();
let rules = vec![rule1, rule2];
let naive = chase(instance.clone(), &rules);
let semi_naive = chase_with_config(
instance,
&rules,
ChaseConfig {
semi_naive: true,
..Default::default()
},
);
assert!(naive.terminated);
assert!(semi_naive.terminated);
let naive_paths = naive.instance.facts_for_predicate("Path");
let semi_paths = semi_naive.instance.facts_for_predicate("Path");
assert_eq!(naive_paths.len(), semi_paths.len());
// 4-node chain should yield 4 + 3 + 2 + 1 = 10 paths.
assert_eq!(semi_paths.len(), 10);
}
#[test]
fn test_semi_naive_handles_existentials() {
let instance: Instance = vec![
Atom::new("Person", vec![Term::constant("alice")]),
Atom::new("Person", vec![Term::constant("bob")]),
]
.into_iter()
.collect();
let rule = RuleBuilder::new()
.when("Person", vec![Term::var("X")])
.then("HasSSN", vec![Term::var("X"), Term::var("Y")])
.build();
let result = chase_with_config(
instance,
&[rule],
ChaseConfig {
semi_naive: true,
..Default::default()
},
);
assert!(result.terminated);
let has_ssn = result.instance.facts_for_predicate("HasSSN");
assert_eq!(has_ssn.len(), 2);
}
// Skolem chase tests
#[test]
fn test_skolem_chase_terminates_with_existentials() {
let instance: Instance = vec![Atom::new("Person", vec![Term::constant("alice")])]
.into_iter()
.collect();
let rule = RuleBuilder::new()
.when("Person", vec![Term::var("X")])
.then("HasSSN", vec![Term::var("X"), Term::var("Y")])
.build();
let result = skolem_chase(instance, &[rule]);
assert!(result.terminated);
let has_ssn = result.instance.facts_for_predicate("HasSSN");
assert_eq!(has_ssn.len(), 1);
assert!(matches!(has_ssn[0].terms[1], Term::Null(_)));
}
#[test]
fn test_skolem_chase_reuses_null_for_same_frontier_binding() {
// Two TGDs with the same frontier should produce nulls that the
// chase recognizes as the same value when Skolem is used.
let instance: Instance = vec![Atom::new("Person", vec![Term::constant("alice")])]
.into_iter()
.collect();
let rule = RuleBuilder::new()
.when("Person", vec![Term::var("X")])
.then("HasSSN", vec![Term::var("X"), Term::var("Y")])
.build();
// Run twice on the same input: the Skolem null id should be stable
// within one chase run (re-application is idempotent).
let result1 = skolem_chase(instance.clone(), std::slice::from_ref(&rule));
let result2 = skolem_chase(instance, &[rule]);
let f1 = result1.instance.facts_for_predicate("HasSSN");
let f2 = result2.instance.facts_for_predicate("HasSSN");
assert_eq!(f1.len(), 1);
assert_eq!(f2.len(), 1);
// Same null id within a single run.
assert_eq!(f1[0].terms[1], f2[0].terms[1]);
}
#[test]
fn test_skolem_chase_distinct_frontiers_get_distinct_nulls() {
let instance: Instance = vec![
Atom::new("Person", vec![Term::constant("alice")]),
Atom::new("Person", vec![Term::constant("bob")]),
]
.into_iter()
.collect();
let rule = RuleBuilder::new()
.when("Person", vec![Term::var("X")])
.then("HasSSN", vec![Term::var("X"), Term::var("Y")])
.build();
let result = skolem_chase(instance, &[rule]);
assert!(result.terminated);
let has_ssn = result.instance.facts_for_predicate("HasSSN");
assert_eq!(has_ssn.len(), 2);
let nulls: Vec<_> = has_ssn.iter().map(|f| &f.terms[1]).collect();
assert_ne!(nulls[0], nulls[1]);
}
#[test]
fn test_skolem_chase_matches_restricted_for_datalog() {
let instance: Instance = vec![
Atom::new("Edge", vec![Term::constant("a"), Term::constant("b")]),
Atom::new("Edge", vec![Term::constant("b"), Term::constant("c")]),
]
.into_iter()
.collect();
let rule1 = RuleBuilder::new()
.when("Edge", vec![Term::var("X"), Term::var("Y")])
.then("Path", vec![Term::var("X"), Term::var("Y")])
.build();
let rule2 = RuleBuilder::new()
.when("Path", vec![Term::var("X"), Term::var("Y")])
.when("Edge", vec![Term::var("Y"), Term::var("Z")])
.then("Path", vec![Term::var("X"), Term::var("Z")])
.build();
let rules = vec![rule1, rule2];
let skolem = skolem_chase(instance.clone(), &rules);
let restricted = chase(instance, &rules);
assert!(skolem.terminated);
assert!(restricted.terminated);
assert_eq!(
skolem.instance.facts_for_predicate("Path").len(),
restricted.instance.facts_for_predicate("Path").len(),
);
}
#[test] #[test]
fn test_oblivious_chase_via_config() { fn test_oblivious_chase_via_config() {
let instance: Instance = vec![Atom::new("A", vec![Term::constant("x")])] let instance: Instance = vec![Atom::new("A", vec![Term::constant("x")])]

View File

@ -47,6 +47,67 @@ impl NullGenerator {
} }
} }
/// A deterministic null generator keyed on rule identity, the name of the
/// existential variable, and the frontier-variable bindings of the current
/// rule application.
///
/// The Skolem chase uses this so that the same rule application with the same
/// frontier bindings always yields the same labeled null, giving natural
/// termination for rules whose re-application would otherwise invent fresh
/// nulls forever.
#[derive(Debug, Default)]
pub(crate) struct SkolemGenerator {
counter: usize,
cache: HashMap<SkolemKey, Term>,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct SkolemKey {
rule_index: usize,
existential: String,
frontier_bindings: Vec<(String, Term)>,
}
impl SkolemGenerator {
pub(crate) fn seeded_from(instance: &Instance, rules: &[Rule]) -> Self {
Self {
counter: next_null_id(instance, rules),
cache: HashMap::new(),
}
}
pub(crate) fn skolem_for(
&mut self,
rule_index: usize,
rule: &Rule,
existential: &str,
subst: &Substitution,
) -> Term {
let frontier = rule.frontier_variables();
let mut bindings: Vec<_> = frontier
.into_iter()
.filter_map(|variable| subst.get(&variable).map(|term| (variable, term.clone())))
.collect();
bindings.sort_by(|left, right| left.0.cmp(&right.0));
let key = SkolemKey {
rule_index,
existential: existential.to_string(),
frontier_bindings: bindings,
};
if let Some(term) = self.cache.get(&key) {
return term.clone();
}
let id = self.counter;
self.counter += 1;
let term = Term::Null(id);
self.cache.insert(key, term.clone());
term
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)] #[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub(crate) struct Trigger { pub(crate) struct Trigger {
rule_index: usize, rule_index: usize,
@ -159,6 +220,67 @@ pub fn find_matches(instance: &Instance, body: &[Atom]) -> Vec<Substitution> {
results results
} }
/// Compute body matches for a chase step, optionally using semi-naive
/// evaluation against a delta of facts added in the previous round.
///
/// When `delta` is `None`, this matches the body against the full instance.
/// When `delta` is `Some`, the result is the union over each body position `i`
/// of matches that bind position `i` against `delta` and the remaining
/// positions against the full instance. Body matches that do not involve any
/// fact from `delta` are skipped, since they would have been produced in an
/// earlier round.
pub(crate) fn find_matches_for_step(
instance: &Instance,
delta: Option<&Instance>,
body: &[Atom],
) -> Vec<Substitution> {
let Some(delta) = delta else {
return find_matches(instance, body);
};
if body.is_empty() {
// Rules with empty bodies fire once at round zero and have no
// body atom to anchor against `delta` afterwards.
return Vec::new();
}
let mut all = Vec::new();
for delta_index in 0..body.len() {
let mut results = vec![Substitution::new()];
for (position, body_atom) in body.iter().enumerate() {
let target = if position == delta_index {
delta
} else {
instance
};
let mut new_results = Vec::new();
for subst in &results {
let pattern = subst.apply_atom(body_atom);
for fact in target.facts_matching_pattern(&pattern) {
if let Some(next_subst) = unify_atom(&pattern, fact) {
let mut combined = subst.clone();
for (var, term) in next_subst.iter() {
combined.bind(var.clone(), term.clone());
}
new_results.push(combined);
}
}
}
results = new_results;
if results.is_empty() {
break;
}
}
all.extend(results);
}
all
}
impl MaterializedState { impl MaterializedState {
pub fn provenance_for(&self, atom: &Atom) -> Option<&Derivation> { pub fn provenance_for(&self, atom: &Atom) -> Option<&Derivation> {
self.provenance.get(atom) self.provenance.get(atom)
@ -194,6 +316,29 @@ pub(crate) fn apply_rule_head(
.collect() .collect()
} }
/// Like [`apply_rule_head`], but binds existential variables to deterministic
/// Skolem nulls based on `(rule_index, existential_name, frontier_bindings)`.
pub(crate) fn apply_rule_head_skolem(
rule_index: usize,
rule: &Rule,
subst: &Substitution,
skolem_gen: &mut SkolemGenerator,
) -> Vec<Atom> {
let mut extended_subst = subst.clone();
let mut existentials = rule.existential_variables().into_iter().collect::<Vec<_>>();
existentials.sort();
for variable in existentials {
let term = skolem_gen.skolem_for(rule_index, rule, &variable, subst);
extended_subst.bind(variable, term);
}
rule.head
.iter()
.map(|atom| extended_subst.apply_atom(atom))
.collect()
}
pub(crate) fn next_null_id(instance: &Instance, rules: &[Rule]) -> usize { pub(crate) fn next_null_id(instance: &Instance, rules: &[Rule]) -> usize {
let instance_max = instance let instance_max = instance
.iter() .iter()

View File

@ -13,7 +13,7 @@ mod engine;
pub use atom::Atom; pub use atom::Atom;
pub use engine::{ pub use engine::{
ChaseConfig, ChaseError, ChaseResult, ChaseVariant, chase, chase_full, chase_with_config, ChaseConfig, ChaseError, ChaseResult, ChaseVariant, chase, chase_full, chase_with_config,
chase_with_egds, oblivious_chase, standard_chase, chase_with_egds, oblivious_chase, skolem_chase, standard_chase,
}; };
pub use inference::{Derivation, MaterializedState, find_matches, materialize}; pub use inference::{Derivation, MaterializedState, find_matches, materialize};
pub use instance::{Instance, InstanceError}; pub use instance::{Instance, InstanceError};

View File

@ -4,6 +4,7 @@
//! provides table scans. The built-in [`Instance`](crate::chase::Instance) //! provides table scans. The built-in [`Instance`](crate::chase::Instance)
//! adapter and the [`TableStore`] are the two provided implementations. //! adapter and the [`TableStore`] are the two provided implementations.
pub mod physical;
pub mod table_store; pub mod table_store;
use std::cmp::Ordering; use std::cmp::Ordering;
@ -14,6 +15,9 @@ use crate::chase::{Instance, Term};
use crate::planner::logical::{LogicalExpr, LogicalPlan, SortDirection, SortKey}; use crate::planner::logical::{LogicalExpr, LogicalPlan, SortDirection, SortKey};
use crate::relational::{ResultSet, Row, Schema, Value}; use crate::relational::{ResultSet, Row, Schema, Value};
pub use physical::{
NamedPhysicalExpr, PhysicalPlan, execute_physical, plan_physical, rewrite_physical,
};
pub use table_store::TableStore; pub use table_store::TableStore;
/// Errors returned by the current logical-plan executor. /// Errors returned by the current logical-plan executor.

470
src/execution/physical.rs Normal file
View File

@ -0,0 +1,470 @@
//! Physical operator scaffolding.
//!
//! Logical plans describe *what* the query computes; physical plans describe
//! *how* it is executed. Today the physical layer mirrors the logical layer
//! one-to-one. The split exists so future work can add operator strategies
//! (for example, a hash join physical operator alongside the current
//! nested-loop join) without changing logical semantics.
//!
//! The current layer is intentionally narrow:
//!
//! - [`PhysicalPlan`] mirrors [`LogicalPlan`] with execution-oriented names.
//! - [`plan_physical`] converts a logical plan into a physical plan.
//! - [`rewrite_physical`] applies a small set of rule-based rewrites.
//! - [`execute_physical`] runs the physical plan against a [`DataSource`].
use std::cmp::Ordering;
use crate::planner::logical::{LogicalExpr, LogicalPlan, SortDirection, SortKey};
use crate::relational::{ResultSet, Row, Schema, Value};
use super::{DataSource, ExecutionError};
/// A physical plan node in the current execution subset.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PhysicalPlan {
/// Sequentially scan all rows of a table from the data source.
SeqScan { table: String, schema: Schema },
/// Form the Cartesian product of two inputs by iterating the right
/// input for each row of the left input.
NestedLoopJoin {
left: Box<PhysicalPlan>,
right: Box<PhysicalPlan>,
schema: Schema,
},
/// Filter rows by a predicate expression.
Filter {
input: Box<PhysicalPlan>,
predicate: LogicalExpr,
},
/// Sort rows by one or more output columns.
Sort {
input: Box<PhysicalPlan>,
keys: Vec<SortKey>,
schema: Schema,
},
/// Project a new output schema by evaluating expressions per row.
Project {
input: Box<PhysicalPlan>,
expressions: Vec<NamedPhysicalExpr>,
schema: Schema,
},
/// Limit the number of output rows.
Limit {
input: Box<PhysicalPlan>,
count: usize,
},
}
/// A named physical expression in a projection.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct NamedPhysicalExpr {
/// Output column name.
pub name: String,
/// Expression to evaluate.
pub expr: LogicalExpr,
}
impl PhysicalPlan {
/// Return the schema produced by this physical plan.
pub fn output_schema(&self) -> &Schema {
match self {
Self::SeqScan { schema, .. } => schema,
Self::NestedLoopJoin { schema, .. } => schema,
Self::Filter { input, .. } => input.output_schema(),
Self::Sort { schema, .. } => schema,
Self::Project { schema, .. } => schema,
Self::Limit { input, .. } => input.output_schema(),
}
}
}
/// Translate a [`LogicalPlan`] into a [`PhysicalPlan`].
///
/// This is currently a one-to-one mapping. Logical `Scan` becomes physical
/// `SeqScan`, logical `CrossJoin` becomes physical `NestedLoopJoin`, and
/// other operators keep their names. Future strategy choices belong here.
pub fn plan_physical(plan: &LogicalPlan) -> PhysicalPlan {
match plan {
LogicalPlan::Scan { table, schema } => PhysicalPlan::SeqScan {
table: table.clone(),
schema: schema.clone(),
},
LogicalPlan::CrossJoin {
left,
right,
schema,
} => PhysicalPlan::NestedLoopJoin {
left: Box::new(plan_physical(left)),
right: Box::new(plan_physical(right)),
schema: schema.clone(),
},
LogicalPlan::Filter { input, predicate } => PhysicalPlan::Filter {
input: Box::new(plan_physical(input)),
predicate: predicate.clone(),
},
LogicalPlan::Sort {
input,
keys,
schema,
} => PhysicalPlan::Sort {
input: Box::new(plan_physical(input)),
keys: keys.clone(),
schema: schema.clone(),
},
LogicalPlan::Project {
input,
expressions,
schema,
} => PhysicalPlan::Project {
input: Box::new(plan_physical(input)),
expressions: expressions
.iter()
.map(|named| NamedPhysicalExpr {
name: named.name.clone(),
expr: named.expr.clone(),
})
.collect(),
schema: schema.clone(),
},
LogicalPlan::Limit { input, count } => PhysicalPlan::Limit {
input: Box::new(plan_physical(input)),
count: *count,
},
}
}
/// Apply rule-based rewrites to a physical plan.
///
/// Today the only rewrite is `combine_adjacent_limits`, which collapses
/// `Limit(Limit(child, n), m)` into `Limit(child, min(n, m))`. Future
/// rewrites belong here as additional functions composed in this entry
/// point.
pub fn rewrite_physical(plan: PhysicalPlan) -> PhysicalPlan {
combine_adjacent_limits(plan)
}
fn combine_adjacent_limits(plan: PhysicalPlan) -> PhysicalPlan {
match plan {
PhysicalPlan::Limit { input, count } => {
let inner = combine_adjacent_limits(*input);
match inner {
PhysicalPlan::Limit {
input: child,
count: inner_count,
} => PhysicalPlan::Limit {
input: child,
count: count.min(inner_count),
},
other => PhysicalPlan::Limit {
input: Box::new(other),
count,
},
}
}
PhysicalPlan::NestedLoopJoin {
left,
right,
schema,
} => PhysicalPlan::NestedLoopJoin {
left: Box::new(combine_adjacent_limits(*left)),
right: Box::new(combine_adjacent_limits(*right)),
schema,
},
PhysicalPlan::Filter { input, predicate } => PhysicalPlan::Filter {
input: Box::new(combine_adjacent_limits(*input)),
predicate,
},
PhysicalPlan::Sort {
input,
keys,
schema,
} => PhysicalPlan::Sort {
input: Box::new(combine_adjacent_limits(*input)),
keys,
schema,
},
PhysicalPlan::Project {
input,
expressions,
schema,
} => PhysicalPlan::Project {
input: Box::new(combine_adjacent_limits(*input)),
expressions,
schema,
},
leaf @ PhysicalPlan::SeqScan { .. } => leaf,
}
}
/// Execute a physical plan against the provided data source.
pub fn execute_physical(
plan: &PhysicalPlan,
source: &dyn DataSource,
) -> Result<ResultSet, ExecutionError> {
match plan {
PhysicalPlan::SeqScan { table, schema } => source.scan(table, schema),
PhysicalPlan::NestedLoopJoin {
left,
right,
schema,
} => {
let left_result = execute_physical(left, source)?;
let right_result = execute_physical(right, source)?;
let mut rows = Vec::new();
for left_row in left_result.rows() {
for right_row in right_result.rows() {
let mut values = left_row.values().to_vec();
values.extend_from_slice(right_row.values());
rows.push(Row::new(values));
}
}
Ok(ResultSet::new(schema.clone(), rows))
}
PhysicalPlan::Filter { input, predicate } => {
let result = execute_physical(input, source)?;
let filtered_rows = result
.rows()
.iter()
.filter(|row| eval_predicate(predicate, row, result.schema()).unwrap_or(false))
.cloned()
.collect();
Ok(ResultSet::new(result.schema().clone(), filtered_rows))
}
PhysicalPlan::Project {
input,
expressions,
schema,
} => {
let result = execute_physical(input, source)?;
let mut rows = Vec::new();
for row in result.rows() {
let values = expressions
.iter()
.map(|named| eval_expr(&named.expr, row, result.schema()))
.collect::<Result<Vec<_>, _>>()?;
rows.push(Row::new(values));
}
Ok(ResultSet::new(schema.clone(), rows))
}
PhysicalPlan::Sort {
input,
keys,
schema,
} => {
let result = execute_physical(input, source)?;
let mut rows = result.rows().to_vec();
let resolved = resolve_sort_keys(keys, result.schema())?;
rows.sort_by(|left, right| compare_rows(left, right, &resolved));
Ok(ResultSet::new(schema.clone(), rows))
}
PhysicalPlan::Limit { input, count } => {
let result = execute_physical(input, source)?;
let rows = result.rows().iter().take(*count).cloned().collect();
Ok(ResultSet::new(result.schema().clone(), rows))
}
}
}
fn eval_predicate(expr: &LogicalExpr, row: &Row, schema: &Schema) -> Result<bool, ExecutionError> {
match expr {
LogicalExpr::Eq(left, right) => Ok(eval_expr(left, row, schema)?
.sql_eq(&eval_expr(right, row, schema)?)
.unwrap_or(false)),
LogicalExpr::Ne(left, right) => Ok(eval_expr(left, row, schema)?
.sql_eq(&eval_expr(right, row, schema)?)
.map(|eq| !eq)
.unwrap_or(false)),
LogicalExpr::And(left, right) => {
Ok(eval_predicate(left, row, schema)? && eval_predicate(right, row, schema)?)
}
LogicalExpr::Or(left, right) => {
Ok(eval_predicate(left, row, schema)? || eval_predicate(right, row, schema)?)
}
_ => Ok(false),
}
}
fn eval_expr(expr: &LogicalExpr, row: &Row, schema: &Schema) -> Result<Value, ExecutionError> {
match expr {
LogicalExpr::Column(name) => {
let index = schema
.index_of(name)
.ok_or_else(|| ExecutionError::UnknownColumn(name.clone()))?;
Ok(row.get(index).cloned().unwrap_or(Value::Null))
}
LogicalExpr::Literal(value) => Ok(value.clone()),
LogicalExpr::Eq(left, right) => {
let left = eval_expr(left, row, schema)?;
let right = eval_expr(right, row, schema)?;
Ok(Value::Boolean(left.sql_eq(&right).unwrap_or(false)))
}
LogicalExpr::Ne(left, right) => {
let left = eval_expr(left, row, schema)?;
let right = eval_expr(right, row, schema)?;
Ok(Value::Boolean(
left.sql_eq(&right).map(|eq| !eq).unwrap_or(false),
))
}
LogicalExpr::And(left, right) => Ok(Value::Boolean(
eval_predicate(left, row, schema)? && eval_predicate(right, row, schema)?,
)),
LogicalExpr::Or(left, right) => Ok(Value::Boolean(
eval_predicate(left, row, schema)? || eval_predicate(right, row, schema)?,
)),
}
}
fn resolve_sort_keys(
keys: &[SortKey],
schema: &Schema,
) -> Result<Vec<(usize, SortDirection)>, ExecutionError> {
keys.iter()
.map(|key| {
let index = schema
.index_of(&key.column)
.ok_or_else(|| ExecutionError::UnknownColumn(key.column.clone()))?;
Ok((index, key.direction))
})
.collect()
}
fn compare_rows(left: &Row, right: &Row, keys: &[(usize, SortDirection)]) -> Ordering {
for (index, direction) in keys {
let left_value = left.get(*index).unwrap_or(&Value::Null);
let right_value = right.get(*index).unwrap_or(&Value::Null);
let ordering = compare_values(left_value, right_value);
if ordering != Ordering::Equal {
return match direction {
SortDirection::Asc => ordering,
SortDirection::Desc => ordering.reverse(),
};
}
}
Ordering::Equal
}
fn compare_values(left: &Value, right: &Value) -> Ordering {
match (left, right) {
(Value::Null, Value::Null) => Ordering::Equal,
(Value::Null, _) => Ordering::Greater,
(_, Value::Null) => Ordering::Less,
(Value::Text(left), Value::Text(right)) => left.cmp(right),
(Value::Integer(left), Value::Integer(right)) => left.cmp(right),
(Value::Boolean(left), Value::Boolean(right)) => left.cmp(right),
(Value::Integer(_), _) => Ordering::Less,
(_, Value::Integer(_)) => Ordering::Greater,
(Value::Text(_), Value::Boolean(_)) => Ordering::Less,
(Value::Boolean(_), Value::Text(_)) => Ordering::Greater,
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::chase::{Atom, Instance, Term};
use crate::planner::logical::{LogicalPlan, NamedExpr};
use crate::relational::{DataType, Field};
fn parent_instance() -> Instance {
vec![
Atom::new(
"Parent",
vec![Term::constant("alice"), Term::constant("bob")],
),
Atom::new(
"Parent",
vec![Term::constant("bob"), Term::constant("carol")],
),
]
.into_iter()
.collect()
}
fn parent_schema() -> Schema {
Schema::new(vec![
Field::new("c0", DataType::Text, false),
Field::new("c1", DataType::Text, false),
])
}
#[test]
fn plan_physical_mirrors_logical_scan() {
let logical = LogicalPlan::Scan {
table: "Parent".to_string(),
schema: parent_schema(),
};
let physical = plan_physical(&logical);
assert!(matches!(physical, PhysicalPlan::SeqScan { .. }));
}
#[test]
fn execute_physical_runs_scan_and_limit() {
let schema = parent_schema();
let logical = LogicalPlan::Limit {
input: Box::new(LogicalPlan::Scan {
table: "Parent".to_string(),
schema: schema.clone(),
}),
count: 1,
};
let physical = plan_physical(&logical);
let result = execute_physical(&physical, &parent_instance()).unwrap();
assert_eq!(result.rows().len(), 1);
}
#[test]
fn rewrite_collapses_adjacent_limits() {
let schema = parent_schema();
let nested = PhysicalPlan::Limit {
input: Box::new(PhysicalPlan::Limit {
input: Box::new(PhysicalPlan::SeqScan {
table: "Parent".to_string(),
schema,
}),
count: 5,
}),
count: 2,
};
let rewritten = rewrite_physical(nested);
match rewritten {
PhysicalPlan::Limit { input, count } => {
assert_eq!(count, 2);
assert!(matches!(*input, PhysicalPlan::SeqScan { .. }));
}
other => panic!("expected single Limit, got {:?}", other),
}
}
#[test]
fn execute_physical_handles_projection_and_filter() {
let schema = parent_schema();
let logical = LogicalPlan::Project {
input: Box::new(LogicalPlan::Filter {
input: Box::new(LogicalPlan::Scan {
table: "Parent".to_string(),
schema: schema.clone(),
}),
predicate: LogicalExpr::Eq(
Box::new(LogicalExpr::Column("c1".to_string())),
Box::new(LogicalExpr::Literal(Value::text("bob"))),
),
}),
expressions: vec![NamedExpr {
name: "c0".to_string(),
expr: LogicalExpr::Column("c0".to_string()),
}],
schema: Schema::new(vec![Field::new("c0", DataType::Text, false)]),
};
let physical = rewrite_physical(plan_physical(&logical));
let result = execute_physical(&physical, &parent_instance()).unwrap();
assert_eq!(result.rows().len(), 1);
assert_eq!(result.rows()[0].values()[0], Value::text("alice"));
}
}

View File

@ -18,5 +18,5 @@ pub mod sql;
// Lower-level reasoning and provenance APIs remain under `query_engine::chase`. // Lower-level reasoning and provenance APIs remain under `query_engine::chase`.
pub use chase::{ pub use chase::{
Atom, ChaseConfig, ChaseError, ChaseResult, ChaseVariant, Instance, Rule, RuleBuilder, Term, Atom, ChaseConfig, ChaseError, ChaseResult, ChaseVariant, Instance, Rule, RuleBuilder, Term,
chase, chase_with_config, oblivious_chase, standard_chase, chase, chase_with_config, oblivious_chase, skolem_chase, standard_chase,
}; };