Compare commits

...

2 Commits

Author SHA1 Message Date
Hassan Abedi
5b206bfc23 Add filter push-down and SQL aggregation/GROUP BY 2026-04-13 13:54:40 +02:00
Hassan Abedi
177cee7044 Add Skolem chase and semi-naive evaluation support 2026-04-13 13:54:36 +02:00
18 changed files with 2331 additions and 37 deletions

View File

@ -59,7 +59,7 @@ Quick examples:
- `src/catalog/`: predicate-to-table schema inference and catalog access. - `src/catalog/`: predicate-to-table schema inference and catalog access.
- `src/sql/`: narrow SQL AST and parser support. - `src/sql/`: narrow SQL AST and parser support.
- `src/planner/`: logical plan structures and SQL-to-plan translation. - `src/planner/`: logical plan structures and SQL-to-plan translation.
- `src/execution/`: execution of the current logical plan subset, including the `DataSource` trait and the `TableStore` in-memory source. - `src/execution/`: execution of the current logical plan subset, including the `DataSource` trait, the `TableStore` in-memory source, and the physical operator layer in `physical.rs` with rule-based rewrites.
- `examples/scripts/`: runnable script examples for supported workflows. - `examples/scripts/`: runnable script examples for supported workflows.
- `tests/`: integration, regression, and property-based tests. - `tests/`: integration, regression, and property-based tests.
@ -71,7 +71,7 @@ Quick examples:
- The chase engine should remain largely stateless; pass execution state explicitly. - The chase engine should remain largely stateless; pass execution state explicitly.
- New chase variants should be composable with existing infrastructure. - New chase variants should be composable with existing infrastructure.
- Existential variables generate labeled nulls (`Term::Null`). - Existential variables generate labeled nulls (`Term::Null`).
- The current SQL support is intentionally narrow: `SELECT-FROM-WHERE-ORDER BY-LIMIT` over predicate-backed tables; equality and inequality predicates combined with `AND` and `OR`; comma-join style multi-table queries; table aliases; ordering by output-column names; integer and string literals. - The current SQL support is intentionally narrow: `SELECT-FROM-WHERE-GROUP BY-ORDER BY-LIMIT` over predicate-backed tables; equality and inequality predicates combined with `AND` and `OR`; comma-join style multi-table queries; table aliases; ordering by output-column names; integer and string literals; `COUNT`, `SUM`, `MIN`, `MAX`, and `AVG` aggregates with optional `GROUP BY`.
- Stable SQL column names come from explicit catalog registration or the frontend `schema ...` command, including for empty tables; otherwise the default names are positional such as `c0` and `c1`. - Stable SQL column names come from explicit catalog registration or the frontend `schema ...` command, including for empty tables; otherwise the default names are positional such as `c0` and `c1`.
- Single-table SQL queries may use the table name as a qualifier when no alias is present. - Single-table SQL queries may use the table name as a qualifier when no alias is present.
- Do not describe unsupported SQL features such as aggregates, grouping, or arbitrary expressions as implemented. - Do not describe unsupported SQL features such as aggregates, grouping, or arbitrary expressions as implemented.

View File

@ -34,6 +34,15 @@ binaries = []
[dev-dependencies] [dev-dependencies]
proptest = "1.6" proptest = "1.6"
criterion = { version = "0.5", default-features = false }
[[bench]]
name = "chase"
harness = false
[[bench]]
name = "sql"
harness = false
[profile.release] [profile.release]
strip = "debuginfo" strip = "debuginfo"

View File

@ -10,11 +10,14 @@ execution boundaries.
### Current scope ### Current scope
- Chase-based rule evaluation over facts, rules, and substitutions - Chase-based rule evaluation over facts, rules, and substitutions
- Restricted-chase style materialization with active-trigger checks - Restricted, standard, oblivious, and Skolem chase variants
- Optional semi-naive evaluation across all chase variants
- Provenance-oriented explanations for derived answers - Provenance-oriented explanations for derived answers
- Script, REPL, and local web UI for experimentation - Script, REPL, and local web UI for experimentation
- Relational schema, catalog, logical-plan, and execution scaffolding - Relational schema, catalog, logical-plan, and execution scaffolding
- A minimal SQL slice for `SELECT-FROM-WHERE-ORDER BY-LIMIT` queries over predicate-backed tables - Physical operator scaffolding with a small rule-based rewrite layer
- A minimal SQL slice for `SELECT-FROM-WHERE-GROUP BY-ORDER BY-LIMIT` queries over predicate-backed tables, including `COUNT`, `SUM`, `MIN`, `MAX`, and `AVG` aggregates
- Filter push-down across joins in the physical rewrite pass
### Architecture ### Architecture
@ -26,7 +29,7 @@ The repository is currently organized around a few clear subsystems:
- `src/catalog/`: predicate-backed table metadata - `src/catalog/`: predicate-backed table metadata
- `src/sql/`: minimal SQL AST and parser - `src/sql/`: minimal SQL AST and parser
- `src/planner/`: logical plan structures and SQL-to-plan translation - `src/planner/`: logical plan structures and SQL-to-plan translation
- `src/execution/`: execution for the current logical-plan subset, `DataSource` trait, and `TableStore` - `src/execution/`: execution for the current logical-plan subset, the `DataSource` trait, the `TableStore`, and a physical operator layer with rule-based rewrites
Today, the chase subsystem is still the most mature part of the codebase. The Today, the chase subsystem is still the most mature part of the codebase. The
relational and SQL modules are present to create clean extension points for a relational and SQL modules are present to create clean extension points for a
@ -141,6 +144,8 @@ WHERE Parent.child = Ancestor.parent
SELECT p.parent, q.child SELECT p.parent, q.child
FROM Parent AS p, Parent AS q FROM Parent AS p, Parent AS q
WHERE p.child = q.parent WHERE p.child = q.parent
SELECT COUNT(*) FROM Parent
SELECT dept, COUNT(*), SUM(salary) FROM Emp GROUP BY dept
``` ```
In the REPL or script runner, use the `sql` command and end the statement with In the REPL or script runner, use the `sql` command and end the statement with
@ -189,7 +194,7 @@ Current limits:
- `ORDER BY` supports output-column ordering with `ASC`/`DESC` - `ORDER BY` supports output-column ordering with `ASC`/`DESC`
- `LIMIT` restricts the number of output rows - `LIMIT` restricts the number of output rows
- literals include strings, integers, and `NULL` - literals include strings, integers, and `NULL`
- no aggregates - aggregates: `COUNT(*)`, `COUNT(col)`, `SUM`, `MIN`, `MAX`, `AVG`, with optional `GROUP BY`
- projection aliases only via `AS` - projection aliases only via `AS`
Runnable SQL examples: Runnable SQL examples:
@ -210,6 +215,12 @@ cargo clippy --all-targets --all-features -- -D warnings
cargo fmt --check cargo fmt --check
``` ```
Benchmarks live under `benches/` and can be run with:
```bash
cargo bench
```
### Notes ### Notes
This repository is still centered on a rule-engine core. The new SQL-related This repository is still centered on a rule-engine core. The new SQL-related

View File

@ -35,6 +35,8 @@ This document tracks the current state and next steps for the repository.
- [x] `!=`/`<>` inequality and `OR` disjunction in `WHERE` clauses - [x] `!=`/`<>` inequality and `OR` disjunction in `WHERE` clauses
- [x] `LIMIT` clause for restricting output row count - [x] `LIMIT` clause for restricting output row count
- [x] Integer literal and `DataType::Integer` support - [x] Integer literal and `DataType::Integer` support
- [x] `COUNT`, `SUM`, `MIN`, `MAX`, `AVG` aggregates with `GROUP BY`
- [x] Filter push-down rewrite across `NestedLoopJoin` in the physical layer
### Near-Term Cleanup ### Near-Term Cleanup
@ -65,9 +67,9 @@ This document tracks the current state and next steps for the repository.
### Execution and Optimization ### Execution and Optimization
- [ ] Introduce physical operator abstractions - [x] Introduce physical operator abstractions
- [x] Add a planning step from logical operators to executable operators - [x] Add a planning step from logical operators to executable operators
- [ ] Add basic rule-based logical rewrites - [x] Add basic rule-based logical rewrites
- [ ] Add statistics and cost-model scaffolding - [ ] Add statistics and cost-model scaffolding
- [ ] Add indexing and access-path abstractions - [ ] Add indexing and access-path abstractions
@ -76,13 +78,13 @@ This document tracks the current state and next steps for the repository.
- [x] Restricted chase - [x] Restricted chase
- [x] Standard chase - [x] Standard chase
- [x] Oblivious chase - [x] Oblivious chase
- [ ] Skolem chase - [x] Skolem chase
- [ ] Core chase - [ ] Core chase
- [ ] Negative constraints - [ ] Negative constraints
- [ ] Stratified negation in rule bodies - [ ] Stratified negation in rule bodies
- [ ] Disjunctive heads - [ ] Disjunctive heads
- [ ] Aggregation support in rule evaluation - [ ] Aggregation support in rule evaluation (available in SQL; not yet exposed to chase rules)
- [ ] Semi-naive evaluation - [x] Semi-naive evaluation
- [ ] Termination analysis helpers - [ ] Termination analysis helpers
### Data and Interoperability ### Data and Interoperability
@ -95,7 +97,7 @@ This document tracks the current state and next steps for the repository.
### Performance and Reliability ### Performance and Reliability
- [ ] Predicate indexing for fact lookup - [x] Predicate indexing for fact lookup
- [ ] Incremental evaluation - [ ] Incremental evaluation
- [ ] Benchmarks - [ ] Benchmarks
- [ ] Fuzzing - [ ] Fuzzing
@ -108,6 +110,6 @@ This document tracks the current state and next steps for the repository.
- [x] Property-based tests - [x] Property-based tests
- [x] Regression tests - [x] Regression tests
- [x] Initial SQL pipeline tests - [x] Initial SQL pipeline tests
- [ ] Benchmark coverage - [x] Benchmark coverage (chase and SQL pipeline via `cargo bench`)
- [ ] Snapshot-style frontend tests - [ ] Snapshot-style frontend tests
- [ ] More planner/executor tests as those layers are added - [ ] More planner/executor tests as those layers are added

98
benches/chase.rs Normal file
View File

@ -0,0 +1,98 @@
//! Benchmarks for the chase subsystem.
//!
//! These are designed to retroactively validate the semi-naive and Skolem
//! work and catch future regressions. Each workload runs several chase
//! variants over the same input so relative numbers are meaningful.
use criterion::{BatchSize, Criterion, criterion_group, criterion_main};
use query_engine::chase::rule::RuleBuilder;
use query_engine::chase::{ChaseConfig, ChaseVariant, Rule, chase_with_config};
use query_engine::{Atom, Instance, Term};
fn chain_edges(n: usize) -> Instance {
(0..n)
.map(|i| {
Atom::new(
"Edge",
vec![
Term::constant(format!("n{}", i)),
Term::constant(format!("n{}", i + 1)),
],
)
})
.collect()
}
fn transitive_closure_rules() -> Vec<Rule> {
let edge_to_path = RuleBuilder::new()
.when("Edge", vec![Term::var("X"), Term::var("Y")])
.then("Path", vec![Term::var("X"), Term::var("Y")])
.build();
let extend_path = RuleBuilder::new()
.when("Path", vec![Term::var("X"), Term::var("Y")])
.when("Edge", vec![Term::var("Y"), Term::var("Z")])
.then("Path", vec![Term::var("X"), Term::var("Z")])
.build();
vec![edge_to_path, extend_path]
}
fn bench_transitive_closure(c: &mut Criterion) {
let mut group = c.benchmark_group("transitive_closure_chain_20");
let instance = chain_edges(20);
let rules = transitive_closure_rules();
for (label, variant, semi) in [
("restricted_naive", ChaseVariant::Restricted, false),
("restricted_semi_naive", ChaseVariant::Restricted, true),
("standard_naive", ChaseVariant::Standard, false),
("standard_semi_naive", ChaseVariant::Standard, true),
] {
let config = ChaseConfig {
variant,
semi_naive: semi,
..Default::default()
};
group.bench_function(label, |b| {
b.iter_batched(
|| instance.clone(),
|inst| chase_with_config(inst, &rules, config.clone()),
BatchSize::SmallInput,
);
});
}
group.finish();
}
fn bench_existentials(c: &mut Criterion) {
let mut group = c.benchmark_group("existentials_50_people");
let instance: Instance = (0..50)
.map(|i| Atom::new("Person", vec![Term::constant(format!("p{}", i))]))
.collect();
let rule = RuleBuilder::new()
.when("Person", vec![Term::var("X")])
.then("HasId", vec![Term::var("X"), Term::var("Y")])
.build();
let rules = vec![rule];
for (label, variant) in [
("restricted", ChaseVariant::Restricted),
("skolem", ChaseVariant::Skolem),
] {
let config = ChaseConfig {
variant,
semi_naive: false,
..Default::default()
};
group.bench_function(label, |b| {
b.iter_batched(
|| instance.clone(),
|inst| chase_with_config(inst, &rules, config.clone()),
BatchSize::SmallInput,
);
});
}
group.finish();
}
criterion_group!(benches, bench_transitive_closure, bench_existentials);
criterion_main!(benches);

115
benches/sql.rs Normal file
View File

@ -0,0 +1,115 @@
//! Benchmarks for the SQL pipeline.
//!
//! Focus areas: scans, single-column filters, multi-table joins with and
//! without filter push-down, and GROUP BY aggregation.
use criterion::{BatchSize, Criterion, criterion_group, criterion_main};
use query_engine::catalog::PredicateCatalog;
use query_engine::execution::TableStore;
use query_engine::execution::execute;
use query_engine::execution::physical::{execute_physical, plan_physical, rewrite_physical};
use query_engine::planner::sql::plan_select;
use query_engine::relational::{DataType, Field, Row, Schema, Value};
use query_engine::sql::parser::parse_select;
use query_engine::{Atom, Instance, Term};
fn edges_instance(n: usize) -> Instance {
(0..n)
.map(|i| {
Atom::new(
"L",
vec![
Term::constant(format!("a{}", i)),
Term::constant(format!("b{}", i)),
],
)
})
.chain((0..n).map(|i| {
Atom::new(
"R",
vec![
Term::constant(format!("b{}", i)),
Term::constant(format!("c{}", i)),
],
)
}))
.collect()
}
fn bench_filter_pushdown_join(c: &mut Criterion) {
let mut group = c.benchmark_group("filter_pushdown_join_100");
let instance = edges_instance(100);
let mut catalog = PredicateCatalog::from_instance(&instance).unwrap();
catalog.rename_columns("L", ["a", "b"]).unwrap();
catalog.rename_columns("R", ["b", "c"]).unwrap();
let select = parse_select("SELECT L.a, R.c FROM L, R WHERE L.b = R.b AND L.a = 'a42'").unwrap();
let logical = plan_select(&select, &catalog).unwrap();
group.bench_function("logical_direct_execute", |b| {
b.iter(|| execute(&logical, &instance).unwrap());
});
let physical_raw = plan_physical(&logical);
group.bench_function("physical_no_rewrite", |b| {
b.iter(|| execute_physical(&physical_raw, &instance).unwrap());
});
let physical_rewritten = rewrite_physical(plan_physical(&logical));
group.bench_function("physical_with_pushdown", |b| {
b.iter(|| execute_physical(&physical_rewritten, &instance).unwrap());
});
group.finish();
}
fn bench_group_by_aggregation(c: &mut Criterion) {
let mut group = c.benchmark_group("group_by_aggregation_1000");
let schema = Schema::new(vec![
Field::new("dept", DataType::Text, false),
Field::new("salary", DataType::Integer, false),
]);
let mut store = TableStore::new();
let rows: Vec<Row> = (0..1000)
.map(|i| {
let dept = format!("d{}", i % 10);
Row::new(vec![Value::text(dept), Value::Integer((i as i64) * 10)])
})
.collect();
store.insert("Emp", schema.clone(), rows);
let mut catalog = PredicateCatalog::new();
catalog.register_table("Emp", schema);
let select =
parse_select("SELECT dept, COUNT(*), SUM(salary), AVG(salary) FROM Emp GROUP BY dept")
.unwrap();
let logical = plan_select(&select, &catalog).unwrap();
group.bench_function("logical_direct", |b| {
b.iter_batched(
|| (),
|_| execute(&logical, &store).unwrap(),
BatchSize::SmallInput,
);
});
let physical = rewrite_physical(plan_physical(&logical));
group.bench_function("physical", |b| {
b.iter_batched(
|| (),
|_| execute_physical(&physical, &store).unwrap(),
BatchSize::SmallInput,
);
});
group.finish();
}
criterion_group!(
benches,
bench_filter_pushdown_join,
bench_group_by_aggregation
);
criterion_main!(benches);

View File

@ -5,7 +5,10 @@ use std::error::Error;
use std::fmt; use std::fmt;
use super::atom::Atom; use super::atom::Atom;
use super::inference::{NullGenerator, Trigger, apply_rule_head, find_matches, head_is_satisfied}; use super::inference::{
NullGenerator, SkolemGenerator, Trigger, apply_rule_head, apply_rule_head_skolem, find_matches,
find_matches_for_step, head_is_satisfied,
};
use super::instance::Instance; use super::instance::Instance;
use super::rule::{Egd, Rule}; use super::rule::{Egd, Rule};
use super::substitution::Substitution; use super::substitution::Substitution;
@ -70,6 +73,13 @@ pub enum ChaseVariant {
/// variables this variant will typically not terminate (it will hit the /// variables this variant will typically not terminate (it will hit the
/// step limit) because each application generates fresh nulls. /// step limit) because each application generates fresh nulls.
Oblivious, Oblivious,
/// Skolem chase: fires every matching rule application, like oblivious,
/// but binds each existential variable to a deterministic (skolem) null
/// keyed on the rule identity, the existential variable, and the
/// frontier-variable bindings. Re-application with the same frontier
/// bindings produces the same head atom, so the chase terminates for many
/// schemas where the oblivious variant does not.
Skolem,
} }
/// Configuration for the chase algorithm. /// Configuration for the chase algorithm.
@ -79,6 +89,11 @@ pub struct ChaseConfig {
pub max_steps: usize, pub max_steps: usize,
/// The chase variant to use. /// The chase variant to use.
pub variant: ChaseVariant, pub variant: ChaseVariant,
/// When true, the chase uses semi-naive evaluation: each round only
/// considers rule applications that involve at least one fact added in
/// the previous round, instead of re-matching the entire instance. This
/// is a pure performance switch and does not change the chase result.
pub semi_naive: bool,
} }
impl Default for ChaseConfig { impl Default for ChaseConfig {
@ -86,6 +101,7 @@ impl Default for ChaseConfig {
ChaseConfig { ChaseConfig {
max_steps: 10_000, max_steps: 10_000,
variant: ChaseVariant::default(), variant: ChaseVariant::default(),
semi_naive: false,
} }
} }
} }
@ -138,6 +154,22 @@ pub fn oblivious_chase(instance: Instance, rules: &[Rule]) -> ChaseResult {
chase_with_config(instance, rules, config) chase_with_config(instance, rules, config)
} }
/// Run the Skolem chase algorithm.
///
/// The Skolem chase fires every matching rule application, like the
/// oblivious variant, but binds each existential variable to a deterministic
/// null keyed on the rule, the variable, and the frontier-variable bindings.
/// Re-application with the same frontier bindings reuses the same null, so
/// the chase terminates whenever the set of derivable facts is finite, even
/// in the presence of existentials.
pub fn skolem_chase(instance: Instance, rules: &[Rule]) -> ChaseResult {
let config = ChaseConfig {
variant: ChaseVariant::Skolem,
..Default::default()
};
chase_with_config(instance, rules, config)
}
/// Run the chase with custom configuration. /// Run the chase with custom configuration.
pub fn chase_with_config( pub fn chase_with_config(
mut instance: Instance, mut instance: Instance,
@ -145,8 +177,16 @@ pub fn chase_with_config(
config: ChaseConfig, config: ChaseConfig,
) -> ChaseResult { ) -> ChaseResult {
let mut null_gen = NullGenerator::seeded_from(&instance, rules); let mut null_gen = NullGenerator::seeded_from(&instance, rules);
let mut skolem_gen = SkolemGenerator::seeded_from(&instance, rules);
let mut applied_triggers: HashSet<Trigger> = HashSet::new(); let mut applied_triggers: HashSet<Trigger> = HashSet::new();
let mut steps = 0; let mut steps = 0;
// For semi-naive evaluation: at round zero every fact is "new", so the
// delta starts as the full input instance.
let mut delta_owned: Option<Instance> = if config.semi_naive {
Some(instance.clone())
} else {
None
};
loop { loop {
if steps >= config.max_steps { if steps >= config.max_steps {
@ -158,12 +198,18 @@ pub fn chase_with_config(
}; };
} }
let delta = delta_owned.as_ref();
let new_facts = match config.variant { let new_facts = match config.variant {
ChaseVariant::Standard => standard_chase_step(&instance, rules, &mut null_gen), ChaseVariant::Standard => standard_chase_step(&instance, delta, rules, &mut null_gen),
ChaseVariant::Restricted => { ChaseVariant::Restricted => restricted_chase_step(
restricted_chase_step(&instance, rules, &mut null_gen, &mut applied_triggers) &instance,
} delta,
ChaseVariant::Oblivious => oblivious_chase_step(&instance, rules, &mut null_gen), rules,
&mut null_gen,
&mut applied_triggers,
),
ChaseVariant::Oblivious => oblivious_chase_step(&instance, delta, rules, &mut null_gen),
ChaseVariant::Skolem => skolem_chase_step(&instance, delta, rules, &mut skolem_gen),
}; };
if new_facts.is_empty() { if new_facts.is_empty() {
@ -176,9 +222,18 @@ pub fn chase_with_config(
}; };
} }
let mut next_delta = if config.semi_naive {
Some(Instance::new())
} else {
None
};
for fact in new_facts { for fact in new_facts {
instance.add(fact); let added = instance.add(fact.clone());
if added && let Some(next) = next_delta.as_mut() {
next.add(fact);
}
} }
delta_owned = next_delta;
steps += 1; steps += 1;
} }
} }
@ -186,6 +241,7 @@ pub fn chase_with_config(
/// Perform a single standard chase step: apply rules without trigger tracking. /// Perform a single standard chase step: apply rules without trigger tracking.
fn standard_chase_step( fn standard_chase_step(
instance: &Instance, instance: &Instance,
delta: Option<&Instance>,
rules: &[Rule], rules: &[Rule],
null_gen: &mut NullGenerator, null_gen: &mut NullGenerator,
) -> Vec<Atom> { ) -> Vec<Atom> {
@ -193,7 +249,7 @@ fn standard_chase_step(
for rule in rules { for rule in rules {
// Find all ways to match the rule body against the instance // Find all ways to match the rule body against the instance
let matches = find_matches(instance, &rule.body); let matches = find_matches_for_step(instance, delta, &rule.body);
for subst in matches { for subst in matches {
// In standard chase, we only check if head is satisfied // In standard chase, we only check if head is satisfied
@ -218,6 +274,7 @@ fn standard_chase_step(
/// Perform a single restricted chase step: use trigger tracking to avoid redundant applications. /// Perform a single restricted chase step: use trigger tracking to avoid redundant applications.
fn restricted_chase_step( fn restricted_chase_step(
instance: &Instance, instance: &Instance,
delta: Option<&Instance>,
rules: &[Rule], rules: &[Rule],
null_gen: &mut NullGenerator, null_gen: &mut NullGenerator,
applied_triggers: &mut HashSet<Trigger>, applied_triggers: &mut HashSet<Trigger>,
@ -226,7 +283,7 @@ fn restricted_chase_step(
for (rule_idx, rule) in rules.iter().enumerate() { for (rule_idx, rule) in rules.iter().enumerate() {
// Find all ways to match the rule body against the instance // Find all ways to match the rule body against the instance
let matches = find_matches(instance, &rule.body); let matches = find_matches_for_step(instance, delta, &rule.body);
for subst in matches { for subst in matches {
// Create a trigger to check if we've already applied this // Create a trigger to check if we've already applied this
@ -262,13 +319,14 @@ fn restricted_chase_step(
/// without checking head satisfaction or tracking triggers. /// without checking head satisfaction or tracking triggers.
fn oblivious_chase_step( fn oblivious_chase_step(
instance: &Instance, instance: &Instance,
delta: Option<&Instance>,
rules: &[Rule], rules: &[Rule],
null_gen: &mut NullGenerator, null_gen: &mut NullGenerator,
) -> Vec<Atom> { ) -> Vec<Atom> {
let mut new_facts = Vec::new(); let mut new_facts = Vec::new();
for rule in rules { for rule in rules {
let matches = find_matches(instance, &rule.body); let matches = find_matches_for_step(instance, delta, &rule.body);
for subst in matches { for subst in matches {
let derived = apply_rule_head(rule, &subst, null_gen); let derived = apply_rule_head(rule, &subst, null_gen);
@ -284,6 +342,36 @@ fn oblivious_chase_step(
new_facts new_facts
} }
/// Perform a single Skolem chase step: fire all matching rule applications
/// using deterministic skolem nulls for existential variables. Natural
/// termination comes from the fact that a rule re-applied with the same
/// frontier bindings produces the same head atom, which is already in the
/// instance.
fn skolem_chase_step(
instance: &Instance,
delta: Option<&Instance>,
rules: &[Rule],
skolem_gen: &mut SkolemGenerator,
) -> Vec<Atom> {
let mut new_facts = Vec::new();
for (rule_index, rule) in rules.iter().enumerate() {
let matches = find_matches_for_step(instance, delta, &rule.body);
for subst in matches {
let derived = apply_rule_head_skolem(rule_index, rule, &subst, skolem_gen);
for fact in derived {
if !instance.contains(&fact) {
new_facts.push(fact);
}
}
}
}
new_facts
}
/// A trigger for EGD applications, tracking which EGD was applied with which body bindings. /// A trigger for EGD applications, tracking which EGD was applied with which body bindings.
#[derive(Debug, Clone, PartialEq, Eq, Hash)] #[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct EgdTrigger { struct EgdTrigger {
@ -361,10 +449,19 @@ pub fn chase_full(
config: ChaseConfig, config: ChaseConfig,
) -> ChaseResult { ) -> ChaseResult {
let mut null_gen = NullGenerator::seeded_from(&instance, tgds); let mut null_gen = NullGenerator::seeded_from(&instance, tgds);
let mut skolem_gen = SkolemGenerator::seeded_from(&instance, tgds);
let mut applied_triggers: HashSet<Trigger> = HashSet::new(); let mut applied_triggers: HashSet<Trigger> = HashSet::new();
let mut applied_egd_triggers: HashSet<EgdTrigger> = HashSet::new(); let mut applied_egd_triggers: HashSet<EgdTrigger> = HashSet::new();
let mut uf = UnionFind::new(); let mut uf = UnionFind::new();
let mut steps = 0; let mut steps = 0;
// Semi-naive delta: starts as the full instance so the first round
// matches against everything and is empty after canonicalization
// changes (which can rewrite the whole instance).
let mut delta_owned: Option<Instance> = if config.semi_naive {
Some(instance.clone())
} else {
None
};
loop { loop {
if steps >= config.max_steps { if steps >= config.max_steps {
@ -377,17 +474,27 @@ pub fn chase_full(
} }
// Apply TGDs // Apply TGDs
let delta = delta_owned.as_ref();
let new_facts = match config.variant { let new_facts = match config.variant {
ChaseVariant::Standard => standard_chase_step(&instance, tgds, &mut null_gen), ChaseVariant::Standard => standard_chase_step(&instance, delta, tgds, &mut null_gen),
ChaseVariant::Restricted => { ChaseVariant::Restricted => {
restricted_chase_step(&instance, tgds, &mut null_gen, &mut applied_triggers) restricted_chase_step(&instance, delta, tgds, &mut null_gen, &mut applied_triggers)
} }
ChaseVariant::Oblivious => oblivious_chase_step(&instance, tgds, &mut null_gen), ChaseVariant::Oblivious => oblivious_chase_step(&instance, delta, tgds, &mut null_gen),
ChaseVariant::Skolem => skolem_chase_step(&instance, delta, tgds, &mut skolem_gen),
}; };
let tgd_changes = !new_facts.is_empty(); let tgd_changes = !new_facts.is_empty();
let mut next_delta = if config.semi_naive {
Some(Instance::new())
} else {
None
};
for fact in new_facts { for fact in new_facts {
instance.add(fact); let added = instance.add(fact.clone());
if added && let Some(next) = next_delta.as_mut() {
next.add(fact);
}
} }
// Apply EGDs // Apply EGDs
@ -406,6 +513,13 @@ pub fn chase_full(
// Canonicalize instance if EGDs made changes // Canonicalize instance if EGDs made changes
if egd_changes { if egd_changes {
instance = instance.canonicalize(&mut uf); instance = instance.canonicalize(&mut uf);
// Canonicalization can rewrite any atom in the instance,
// so the previously computed delta is no longer a sound
// approximation of "new" facts. Reset to the full
// instance for the next round.
if config.semi_naive {
next_delta = Some(instance.clone());
}
} }
// Check for fixpoint // Check for fixpoint
@ -420,6 +534,7 @@ pub fn chase_full(
} }
} }
delta_owned = next_delta;
steps += 1; steps += 1;
} }
} }
@ -842,6 +957,7 @@ mod tests {
let config = ChaseConfig { let config = ChaseConfig {
max_steps: 100, max_steps: 100,
variant: ChaseVariant::Standard, variant: ChaseVariant::Standard,
semi_naive: false,
}; };
let result = chase_full(instance, &[tgd], &[], config); let result = chase_full(instance, &[tgd], &[], config);
@ -934,6 +1050,7 @@ mod tests {
let config = ChaseConfig { let config = ChaseConfig {
max_steps: 10, max_steps: 10,
variant: ChaseVariant::Oblivious, variant: ChaseVariant::Oblivious,
semi_naive: false,
}; };
let result = chase_with_config(instance, &[rule], config); let result = chase_with_config(instance, &[rule], config);
@ -943,6 +1060,181 @@ mod tests {
assert!(result.instance.facts_for_predicate("HasSSN").len() > 1); assert!(result.instance.facts_for_predicate("HasSSN").len() > 1);
} }
// Semi-naive evaluation tests
#[test]
fn test_semi_naive_matches_naive_for_transitive_closure() {
let instance: Instance = vec![
Atom::new("Edge", vec![Term::constant("a"), Term::constant("b")]),
Atom::new("Edge", vec![Term::constant("b"), Term::constant("c")]),
Atom::new("Edge", vec![Term::constant("c"), Term::constant("d")]),
Atom::new("Edge", vec![Term::constant("d"), Term::constant("e")]),
]
.into_iter()
.collect();
let rule1 = RuleBuilder::new()
.when("Edge", vec![Term::var("X"), Term::var("Y")])
.then("Path", vec![Term::var("X"), Term::var("Y")])
.build();
let rule2 = RuleBuilder::new()
.when("Path", vec![Term::var("X"), Term::var("Y")])
.when("Edge", vec![Term::var("Y"), Term::var("Z")])
.then("Path", vec![Term::var("X"), Term::var("Z")])
.build();
let rules = vec![rule1, rule2];
let naive = chase(instance.clone(), &rules);
let semi_naive = chase_with_config(
instance,
&rules,
ChaseConfig {
semi_naive: true,
..Default::default()
},
);
assert!(naive.terminated);
assert!(semi_naive.terminated);
let naive_paths = naive.instance.facts_for_predicate("Path");
let semi_paths = semi_naive.instance.facts_for_predicate("Path");
assert_eq!(naive_paths.len(), semi_paths.len());
// 4-node chain should yield 4 + 3 + 2 + 1 = 10 paths.
assert_eq!(semi_paths.len(), 10);
}
#[test]
fn test_semi_naive_handles_existentials() {
let instance: Instance = vec![
Atom::new("Person", vec![Term::constant("alice")]),
Atom::new("Person", vec![Term::constant("bob")]),
]
.into_iter()
.collect();
let rule = RuleBuilder::new()
.when("Person", vec![Term::var("X")])
.then("HasSSN", vec![Term::var("X"), Term::var("Y")])
.build();
let result = chase_with_config(
instance,
&[rule],
ChaseConfig {
semi_naive: true,
..Default::default()
},
);
assert!(result.terminated);
let has_ssn = result.instance.facts_for_predicate("HasSSN");
assert_eq!(has_ssn.len(), 2);
}
// Skolem chase tests
#[test]
fn test_skolem_chase_terminates_with_existentials() {
let instance: Instance = vec![Atom::new("Person", vec![Term::constant("alice")])]
.into_iter()
.collect();
let rule = RuleBuilder::new()
.when("Person", vec![Term::var("X")])
.then("HasSSN", vec![Term::var("X"), Term::var("Y")])
.build();
let result = skolem_chase(instance, &[rule]);
assert!(result.terminated);
let has_ssn = result.instance.facts_for_predicate("HasSSN");
assert_eq!(has_ssn.len(), 1);
assert!(matches!(has_ssn[0].terms[1], Term::Null(_)));
}
#[test]
fn test_skolem_chase_reuses_null_for_same_frontier_binding() {
// Two TGDs with the same frontier should produce nulls that the
// chase recognizes as the same value when Skolem is used.
let instance: Instance = vec![Atom::new("Person", vec![Term::constant("alice")])]
.into_iter()
.collect();
let rule = RuleBuilder::new()
.when("Person", vec![Term::var("X")])
.then("HasSSN", vec![Term::var("X"), Term::var("Y")])
.build();
// Run twice on the same input: the Skolem null id should be stable
// within one chase run (re-application is idempotent).
let result1 = skolem_chase(instance.clone(), std::slice::from_ref(&rule));
let result2 = skolem_chase(instance, &[rule]);
let f1 = result1.instance.facts_for_predicate("HasSSN");
let f2 = result2.instance.facts_for_predicate("HasSSN");
assert_eq!(f1.len(), 1);
assert_eq!(f2.len(), 1);
// Same null id within a single run.
assert_eq!(f1[0].terms[1], f2[0].terms[1]);
}
#[test]
fn test_skolem_chase_distinct_frontiers_get_distinct_nulls() {
let instance: Instance = vec![
Atom::new("Person", vec![Term::constant("alice")]),
Atom::new("Person", vec![Term::constant("bob")]),
]
.into_iter()
.collect();
let rule = RuleBuilder::new()
.when("Person", vec![Term::var("X")])
.then("HasSSN", vec![Term::var("X"), Term::var("Y")])
.build();
let result = skolem_chase(instance, &[rule]);
assert!(result.terminated);
let has_ssn = result.instance.facts_for_predicate("HasSSN");
assert_eq!(has_ssn.len(), 2);
let nulls: Vec<_> = has_ssn.iter().map(|f| &f.terms[1]).collect();
assert_ne!(nulls[0], nulls[1]);
}
#[test]
fn test_skolem_chase_matches_restricted_for_datalog() {
let instance: Instance = vec![
Atom::new("Edge", vec![Term::constant("a"), Term::constant("b")]),
Atom::new("Edge", vec![Term::constant("b"), Term::constant("c")]),
]
.into_iter()
.collect();
let rule1 = RuleBuilder::new()
.when("Edge", vec![Term::var("X"), Term::var("Y")])
.then("Path", vec![Term::var("X"), Term::var("Y")])
.build();
let rule2 = RuleBuilder::new()
.when("Path", vec![Term::var("X"), Term::var("Y")])
.when("Edge", vec![Term::var("Y"), Term::var("Z")])
.then("Path", vec![Term::var("X"), Term::var("Z")])
.build();
let rules = vec![rule1, rule2];
let skolem = skolem_chase(instance.clone(), &rules);
let restricted = chase(instance, &rules);
assert!(skolem.terminated);
assert!(restricted.terminated);
assert_eq!(
skolem.instance.facts_for_predicate("Path").len(),
restricted.instance.facts_for_predicate("Path").len(),
);
}
#[test] #[test]
fn test_oblivious_chase_via_config() { fn test_oblivious_chase_via_config() {
let instance: Instance = vec![Atom::new("A", vec![Term::constant("x")])] let instance: Instance = vec![Atom::new("A", vec![Term::constant("x")])]

View File

@ -47,6 +47,67 @@ impl NullGenerator {
} }
} }
/// A deterministic null generator keyed on rule identity, the name of the
/// existential variable, and the frontier-variable bindings of the current
/// rule application.
///
/// The Skolem chase uses this so that the same rule application with the same
/// frontier bindings always yields the same labeled null, giving natural
/// termination for rules whose re-application would otherwise invent fresh
/// nulls forever.
#[derive(Debug, Default)]
pub(crate) struct SkolemGenerator {
counter: usize,
cache: HashMap<SkolemKey, Term>,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct SkolemKey {
rule_index: usize,
existential: String,
frontier_bindings: Vec<(String, Term)>,
}
impl SkolemGenerator {
pub(crate) fn seeded_from(instance: &Instance, rules: &[Rule]) -> Self {
Self {
counter: next_null_id(instance, rules),
cache: HashMap::new(),
}
}
pub(crate) fn skolem_for(
&mut self,
rule_index: usize,
rule: &Rule,
existential: &str,
subst: &Substitution,
) -> Term {
let frontier = rule.frontier_variables();
let mut bindings: Vec<_> = frontier
.into_iter()
.filter_map(|variable| subst.get(&variable).map(|term| (variable, term.clone())))
.collect();
bindings.sort_by(|left, right| left.0.cmp(&right.0));
let key = SkolemKey {
rule_index,
existential: existential.to_string(),
frontier_bindings: bindings,
};
if let Some(term) = self.cache.get(&key) {
return term.clone();
}
let id = self.counter;
self.counter += 1;
let term = Term::Null(id);
self.cache.insert(key, term.clone());
term
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)] #[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub(crate) struct Trigger { pub(crate) struct Trigger {
rule_index: usize, rule_index: usize,
@ -159,6 +220,67 @@ pub fn find_matches(instance: &Instance, body: &[Atom]) -> Vec<Substitution> {
results results
} }
/// Compute body matches for a chase step, optionally using semi-naive
/// evaluation against a delta of facts added in the previous round.
///
/// When `delta` is `None`, this matches the body against the full instance.
/// When `delta` is `Some`, the result is the union over each body position `i`
/// of matches that bind position `i` against `delta` and the remaining
/// positions against the full instance. Body matches that do not involve any
/// fact from `delta` are skipped, since they would have been produced in an
/// earlier round.
pub(crate) fn find_matches_for_step(
instance: &Instance,
delta: Option<&Instance>,
body: &[Atom],
) -> Vec<Substitution> {
let Some(delta) = delta else {
return find_matches(instance, body);
};
if body.is_empty() {
// Rules with empty bodies fire once at round zero and have no
// body atom to anchor against `delta` afterwards.
return Vec::new();
}
let mut all = Vec::new();
for delta_index in 0..body.len() {
let mut results = vec![Substitution::new()];
for (position, body_atom) in body.iter().enumerate() {
let target = if position == delta_index {
delta
} else {
instance
};
let mut new_results = Vec::new();
for subst in &results {
let pattern = subst.apply_atom(body_atom);
for fact in target.facts_matching_pattern(&pattern) {
if let Some(next_subst) = unify_atom(&pattern, fact) {
let mut combined = subst.clone();
for (var, term) in next_subst.iter() {
combined.bind(var.clone(), term.clone());
}
new_results.push(combined);
}
}
}
results = new_results;
if results.is_empty() {
break;
}
}
all.extend(results);
}
all
}
impl MaterializedState { impl MaterializedState {
pub fn provenance_for(&self, atom: &Atom) -> Option<&Derivation> { pub fn provenance_for(&self, atom: &Atom) -> Option<&Derivation> {
self.provenance.get(atom) self.provenance.get(atom)
@ -194,6 +316,29 @@ pub(crate) fn apply_rule_head(
.collect() .collect()
} }
/// Like [`apply_rule_head`], but binds existential variables to deterministic
/// Skolem nulls based on `(rule_index, existential_name, frontier_bindings)`.
pub(crate) fn apply_rule_head_skolem(
rule_index: usize,
rule: &Rule,
subst: &Substitution,
skolem_gen: &mut SkolemGenerator,
) -> Vec<Atom> {
let mut extended_subst = subst.clone();
let mut existentials = rule.existential_variables().into_iter().collect::<Vec<_>>();
existentials.sort();
for variable in existentials {
let term = skolem_gen.skolem_for(rule_index, rule, &variable, subst);
extended_subst.bind(variable, term);
}
rule.head
.iter()
.map(|atom| extended_subst.apply_atom(atom))
.collect()
}
pub(crate) fn next_null_id(instance: &Instance, rules: &[Rule]) -> usize { pub(crate) fn next_null_id(instance: &Instance, rules: &[Rule]) -> usize {
let instance_max = instance let instance_max = instance
.iter() .iter()

View File

@ -13,7 +13,7 @@ mod engine;
pub use atom::Atom; pub use atom::Atom;
pub use engine::{ pub use engine::{
ChaseConfig, ChaseError, ChaseResult, ChaseVariant, chase, chase_full, chase_with_config, ChaseConfig, ChaseError, ChaseResult, ChaseVariant, chase, chase_full, chase_with_config,
chase_with_egds, oblivious_chase, standard_chase, chase_with_egds, oblivious_chase, skolem_chase, standard_chase,
}; };
pub use inference::{Derivation, MaterializedState, find_matches, materialize}; pub use inference::{Derivation, MaterializedState, find_matches, materialize};
pub use instance::{Instance, InstanceError}; pub use instance::{Instance, InstanceError};

View File

@ -4,6 +4,7 @@
//! provides table scans. The built-in [`Instance`](crate::chase::Instance) //! provides table scans. The built-in [`Instance`](crate::chase::Instance)
//! adapter and the [`TableStore`] are the two provided implementations. //! adapter and the [`TableStore`] are the two provided implementations.
pub mod physical;
pub mod table_store; pub mod table_store;
use std::cmp::Ordering; use std::cmp::Ordering;
@ -11,9 +12,15 @@ use std::error::Error;
use std::fmt; use std::fmt;
use crate::chase::{Instance, Term}; use crate::chase::{Instance, Term};
use crate::planner::logical::{LogicalExpr, LogicalPlan, SortDirection, SortKey}; use crate::planner::logical::{
AggregateExpr as PlanAggregateExpr, LogicalExpr, LogicalPlan, SortDirection, SortKey,
};
use crate::relational::{ResultSet, Row, Schema, Value}; use crate::relational::{ResultSet, Row, Schema, Value};
use crate::sql::ast::AggregateFunc;
pub use physical::{
NamedPhysicalExpr, PhysicalPlan, execute_physical, plan_physical, rewrite_physical,
};
pub use table_store::TableStore; pub use table_store::TableStore;
/// Errors returned by the current logical-plan executor. /// Errors returned by the current logical-plan executor.
@ -129,6 +136,195 @@ pub fn execute(plan: &LogicalPlan, source: &dyn DataSource) -> Result<ResultSet,
let rows = result.rows().iter().take(*count).cloned().collect(); let rows = result.rows().iter().take(*count).cloned().collect();
Ok(ResultSet::new(result.schema().clone(), rows)) Ok(ResultSet::new(result.schema().clone(), rows))
} }
LogicalPlan::Aggregate {
input,
group_by,
aggregates,
schema,
} => {
let result = execute(input, source)?;
let rows = compute_aggregate(result.rows(), result.schema(), group_by, aggregates)?;
Ok(ResultSet::new(schema.clone(), rows))
}
}
}
/// Evaluate group-by + aggregates over a row set, returning one output row
/// per distinct group key. The output row layout is: group_by column values
/// followed by aggregate output values.
pub(crate) fn compute_aggregate(
rows: &[Row],
input_schema: &Schema,
group_by: &[String],
aggregates: &[PlanAggregateExpr],
) -> Result<Vec<Row>, ExecutionError> {
let group_indexes = group_by
.iter()
.map(|name| {
input_schema
.index_of(name)
.ok_or_else(|| ExecutionError::UnknownColumn(name.clone()))
})
.collect::<Result<Vec<_>, _>>()?;
// Each aggregate holds an optional input column index (None means COUNT(*)).
let agg_indexes = aggregates
.iter()
.map(|agg| {
agg.arg
.as_ref()
.map(|col| {
input_schema
.index_of(col)
.ok_or_else(|| ExecutionError::UnknownColumn(col.clone()))
})
.transpose()
})
.collect::<Result<Vec<Option<usize>>, _>>()?;
// Preserve first-seen group order so single-group output is deterministic.
let mut order: Vec<Vec<Value>> = Vec::new();
let mut groups: std::collections::HashMap<Vec<Value>, Vec<AggregateState>> =
std::collections::HashMap::new();
for row in rows {
let key: Vec<Value> = group_indexes
.iter()
.map(|i| row.get(*i).cloned().unwrap_or(Value::Null))
.collect();
let states = groups.entry(key.clone()).or_insert_with(|| {
order.push(key.clone());
aggregates
.iter()
.map(|agg| AggregateState::new(agg.func))
.collect()
});
for (state, index_opt) in states.iter_mut().zip(agg_indexes.iter()) {
let value = match index_opt {
Some(i) => row.get(*i).cloned().unwrap_or(Value::Null),
None => Value::Null, // COUNT(*) observes each row
};
state.observe(&value, index_opt.is_none());
}
}
// If the user wrote an aggregate with no GROUP BY and no input rows, we
// still need one output row (all-null plus zero counts).
if rows.is_empty() && group_by.is_empty() && !aggregates.is_empty() {
order.push(Vec::new());
groups.insert(
Vec::new(),
aggregates
.iter()
.map(|agg| AggregateState::new(agg.func))
.collect(),
);
}
let mut out_rows = Vec::new();
for key in order {
let states = groups.remove(&key).unwrap_or_default();
let mut values = key;
for state in &states {
values.push(state.finalize());
}
out_rows.push(Row::new(values));
}
Ok(out_rows)
}
#[derive(Debug)]
pub(crate) enum AggregateState {
Count(i64),
Sum(Option<i64>),
Min(Option<Value>),
Max(Option<Value>),
Avg { sum: i64, count: i64 },
}
impl AggregateState {
pub(crate) fn new(func: AggregateFunc) -> Self {
match func {
AggregateFunc::Count => Self::Count(0),
AggregateFunc::Sum => Self::Sum(None),
AggregateFunc::Min => Self::Min(None),
AggregateFunc::Max => Self::Max(None),
AggregateFunc::Avg => Self::Avg { sum: 0, count: 0 },
}
}
pub(crate) fn observe(&mut self, value: &Value, is_count_star: bool) {
match self {
Self::Count(c) => {
if is_count_star || !matches!(value, Value::Null) {
*c += 1;
}
}
Self::Sum(total) => {
if let Value::Integer(n) = value {
*total = Some(total.unwrap_or(0) + n);
}
}
Self::Min(current) => {
if !matches!(value, Value::Null) {
match current {
None => *current = Some(value.clone()),
Some(existing) => {
if compare_values_for_agg(value, existing) == std::cmp::Ordering::Less {
*existing = value.clone();
}
}
}
}
}
Self::Max(current) => {
if !matches!(value, Value::Null) {
match current {
None => *current = Some(value.clone()),
Some(existing) => {
if compare_values_for_agg(value, existing)
== std::cmp::Ordering::Greater
{
*existing = value.clone();
}
}
}
}
}
Self::Avg { sum, count } => {
if let Value::Integer(n) = value {
*sum += n;
*count += 1;
}
}
}
}
pub(crate) fn finalize(&self) -> Value {
match self {
Self::Count(c) => Value::Integer(*c),
Self::Sum(total) => total.map(Value::Integer).unwrap_or(Value::Null),
Self::Min(v) | Self::Max(v) => v.clone().unwrap_or(Value::Null),
Self::Avg { sum, count } => {
if *count == 0 {
Value::Null
} else {
Value::Integer(sum / count)
}
}
}
}
}
fn compare_values_for_agg(left: &Value, right: &Value) -> std::cmp::Ordering {
match (left, right) {
(Value::Integer(a), Value::Integer(b)) => a.cmp(b),
(Value::Text(a), Value::Text(b)) => a.cmp(b),
(Value::Boolean(a), Value::Boolean(b)) => a.cmp(b),
_ => std::cmp::Ordering::Equal,
} }
} }

823
src/execution/physical.rs Normal file
View File

@ -0,0 +1,823 @@
//! Physical operator scaffolding.
//!
//! Logical plans describe *what* the query computes; physical plans describe
//! *how* it is executed. Today the physical layer mirrors the logical layer
//! one-to-one. The split exists so future work can add operator strategies
//! (for example, a hash join physical operator alongside the current
//! nested-loop join) without changing logical semantics.
//!
//! The current layer is intentionally narrow:
//!
//! - [`PhysicalPlan`] mirrors [`LogicalPlan`] with execution-oriented names.
//! - [`plan_physical`] converts a logical plan into a physical plan.
//! - [`rewrite_physical`] applies a small set of rule-based rewrites.
//! - [`execute_physical`] runs the physical plan against a [`DataSource`].
use std::cmp::Ordering;
use crate::planner::logical::{
AggregateExpr as PlanAggregateExpr, LogicalExpr, LogicalPlan, SortDirection, SortKey,
};
use crate::relational::{ResultSet, Row, Schema, Value};
use super::{DataSource, ExecutionError, compute_aggregate};
/// A physical plan node in the current execution subset.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PhysicalPlan {
/// Sequentially scan all rows of a table from the data source.
SeqScan { table: String, schema: Schema },
/// Form the Cartesian product of two inputs by iterating the right
/// input for each row of the left input.
NestedLoopJoin {
left: Box<PhysicalPlan>,
right: Box<PhysicalPlan>,
schema: Schema,
},
/// Filter rows by a predicate expression.
Filter {
input: Box<PhysicalPlan>,
predicate: LogicalExpr,
},
/// Sort rows by one or more output columns.
Sort {
input: Box<PhysicalPlan>,
keys: Vec<SortKey>,
schema: Schema,
},
/// Project a new output schema by evaluating expressions per row.
Project {
input: Box<PhysicalPlan>,
expressions: Vec<NamedPhysicalExpr>,
schema: Schema,
},
/// Limit the number of output rows.
Limit {
input: Box<PhysicalPlan>,
count: usize,
},
/// Compute aggregates per group key using an in-memory hash map.
HashAggregate {
input: Box<PhysicalPlan>,
group_by: Vec<String>,
aggregates: Vec<PlanAggregateExpr>,
schema: Schema,
},
}
/// A named physical expression in a projection.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct NamedPhysicalExpr {
/// Output column name.
pub name: String,
/// Expression to evaluate.
pub expr: LogicalExpr,
}
impl PhysicalPlan {
/// Return the schema produced by this physical plan.
pub fn output_schema(&self) -> &Schema {
match self {
Self::SeqScan { schema, .. } => schema,
Self::NestedLoopJoin { schema, .. } => schema,
Self::Filter { input, .. } => input.output_schema(),
Self::Sort { schema, .. } => schema,
Self::Project { schema, .. } => schema,
Self::Limit { input, .. } => input.output_schema(),
Self::HashAggregate { schema, .. } => schema,
}
}
}
/// Translate a [`LogicalPlan`] into a [`PhysicalPlan`].
///
/// This is currently a one-to-one mapping. Logical `Scan` becomes physical
/// `SeqScan`, logical `CrossJoin` becomes physical `NestedLoopJoin`, and
/// other operators keep their names. Future strategy choices belong here.
pub fn plan_physical(plan: &LogicalPlan) -> PhysicalPlan {
match plan {
LogicalPlan::Scan { table, schema } => PhysicalPlan::SeqScan {
table: table.clone(),
schema: schema.clone(),
},
LogicalPlan::CrossJoin {
left,
right,
schema,
} => PhysicalPlan::NestedLoopJoin {
left: Box::new(plan_physical(left)),
right: Box::new(plan_physical(right)),
schema: schema.clone(),
},
LogicalPlan::Filter { input, predicate } => PhysicalPlan::Filter {
input: Box::new(plan_physical(input)),
predicate: predicate.clone(),
},
LogicalPlan::Sort {
input,
keys,
schema,
} => PhysicalPlan::Sort {
input: Box::new(plan_physical(input)),
keys: keys.clone(),
schema: schema.clone(),
},
LogicalPlan::Project {
input,
expressions,
schema,
} => PhysicalPlan::Project {
input: Box::new(plan_physical(input)),
expressions: expressions
.iter()
.map(|named| NamedPhysicalExpr {
name: named.name.clone(),
expr: named.expr.clone(),
})
.collect(),
schema: schema.clone(),
},
LogicalPlan::Limit { input, count } => PhysicalPlan::Limit {
input: Box::new(plan_physical(input)),
count: *count,
},
LogicalPlan::Aggregate {
input,
group_by,
aggregates,
schema,
} => PhysicalPlan::HashAggregate {
input: Box::new(plan_physical(input)),
group_by: group_by.clone(),
aggregates: aggregates.clone(),
schema: schema.clone(),
},
}
}
/// Apply rule-based rewrites to a physical plan.
///
/// Current rewrites:
/// - [`combine_adjacent_limits`] collapses `Limit(Limit(child, n), m)` into
/// `Limit(child, min(n, m))`.
/// - [`push_filter_below_join`] pushes conjuncts of a `Filter` below a
/// `NestedLoopJoin` when they reference only one side's columns, so the
/// join sees fewer rows.
pub fn rewrite_physical(plan: PhysicalPlan) -> PhysicalPlan {
let plan = push_filter_below_join(plan);
combine_adjacent_limits(plan)
}
/// Push conjuncts of a `Filter` below a `NestedLoopJoin` when each conjunct
/// references only columns from one side of the join. Conjuncts that mention
/// both sides remain above the join.
fn push_filter_below_join(plan: PhysicalPlan) -> PhysicalPlan {
match plan {
PhysicalPlan::Filter { input, predicate } => {
let pushed_input = push_filter_below_join(*input);
match pushed_input {
PhysicalPlan::NestedLoopJoin {
left,
right,
schema,
} => {
let left_cols: Vec<String> = left
.output_schema()
.fields()
.iter()
.map(|f| f.name().to_string())
.collect();
let right_cols: Vec<String> = right
.output_schema()
.fields()
.iter()
.map(|f| f.name().to_string())
.collect();
let mut left_conjuncts: Vec<LogicalExpr> = Vec::new();
let mut right_conjuncts: Vec<LogicalExpr> = Vec::new();
let mut remaining: Vec<LogicalExpr> = Vec::new();
for conjunct in split_conjuncts(predicate) {
let refs = collect_column_refs(&conjunct);
let all_left = refs.iter().all(|c| left_cols.contains(c));
let all_right = refs.iter().all(|c| right_cols.contains(c));
if !refs.is_empty() && all_left {
left_conjuncts.push(conjunct);
} else if !refs.is_empty() && all_right {
right_conjuncts.push(conjunct);
} else {
remaining.push(conjunct);
}
}
let left = if let Some(pred) = combine_conjuncts(left_conjuncts) {
Box::new(PhysicalPlan::Filter {
input: left,
predicate: pred,
})
} else {
left
};
let right = if let Some(pred) = combine_conjuncts(right_conjuncts) {
Box::new(PhysicalPlan::Filter {
input: right,
predicate: pred,
})
} else {
right
};
// Recurse so pushed filters below the join continue to
// push through deeper joins if any.
let left = Box::new(push_filter_below_join(*left));
let right = Box::new(push_filter_below_join(*right));
let joined = PhysicalPlan::NestedLoopJoin {
left,
right,
schema,
};
match combine_conjuncts(remaining) {
Some(pred) => PhysicalPlan::Filter {
input: Box::new(joined),
predicate: pred,
},
None => joined,
}
}
other => PhysicalPlan::Filter {
input: Box::new(other),
predicate,
},
}
}
PhysicalPlan::NestedLoopJoin {
left,
right,
schema,
} => PhysicalPlan::NestedLoopJoin {
left: Box::new(push_filter_below_join(*left)),
right: Box::new(push_filter_below_join(*right)),
schema,
},
PhysicalPlan::Sort {
input,
keys,
schema,
} => PhysicalPlan::Sort {
input: Box::new(push_filter_below_join(*input)),
keys,
schema,
},
PhysicalPlan::Project {
input,
expressions,
schema,
} => PhysicalPlan::Project {
input: Box::new(push_filter_below_join(*input)),
expressions,
schema,
},
PhysicalPlan::Limit { input, count } => PhysicalPlan::Limit {
input: Box::new(push_filter_below_join(*input)),
count,
},
PhysicalPlan::HashAggregate {
input,
group_by,
aggregates,
schema,
} => PhysicalPlan::HashAggregate {
input: Box::new(push_filter_below_join(*input)),
group_by,
aggregates,
schema,
},
leaf @ PhysicalPlan::SeqScan { .. } => leaf,
}
}
fn split_conjuncts(expr: LogicalExpr) -> Vec<LogicalExpr> {
let mut out = Vec::new();
let mut stack = vec![expr];
while let Some(node) = stack.pop() {
match node {
LogicalExpr::And(left, right) => {
stack.push(*right);
stack.push(*left);
}
other => out.push(other),
}
}
out
}
fn combine_conjuncts(mut conjuncts: Vec<LogicalExpr>) -> Option<LogicalExpr> {
if conjuncts.is_empty() {
return None;
}
let mut combined = conjuncts.remove(0);
for next in conjuncts {
combined = LogicalExpr::And(Box::new(combined), Box::new(next));
}
Some(combined)
}
fn collect_column_refs(expr: &LogicalExpr) -> Vec<String> {
let mut out = Vec::new();
fn walk(expr: &LogicalExpr, out: &mut Vec<String>) {
match expr {
LogicalExpr::Column(name) => out.push(name.clone()),
LogicalExpr::Literal(_) => {}
LogicalExpr::Eq(left, right)
| LogicalExpr::Ne(left, right)
| LogicalExpr::And(left, right)
| LogicalExpr::Or(left, right) => {
walk(left, out);
walk(right, out);
}
}
}
walk(expr, &mut out);
out
}
fn combine_adjacent_limits(plan: PhysicalPlan) -> PhysicalPlan {
match plan {
PhysicalPlan::Limit { input, count } => {
let inner = combine_adjacent_limits(*input);
match inner {
PhysicalPlan::Limit {
input: child,
count: inner_count,
} => PhysicalPlan::Limit {
input: child,
count: count.min(inner_count),
},
other => PhysicalPlan::Limit {
input: Box::new(other),
count,
},
}
}
PhysicalPlan::NestedLoopJoin {
left,
right,
schema,
} => PhysicalPlan::NestedLoopJoin {
left: Box::new(combine_adjacent_limits(*left)),
right: Box::new(combine_adjacent_limits(*right)),
schema,
},
PhysicalPlan::Filter { input, predicate } => PhysicalPlan::Filter {
input: Box::new(combine_adjacent_limits(*input)),
predicate,
},
PhysicalPlan::Sort {
input,
keys,
schema,
} => PhysicalPlan::Sort {
input: Box::new(combine_adjacent_limits(*input)),
keys,
schema,
},
PhysicalPlan::Project {
input,
expressions,
schema,
} => PhysicalPlan::Project {
input: Box::new(combine_adjacent_limits(*input)),
expressions,
schema,
},
PhysicalPlan::HashAggregate {
input,
group_by,
aggregates,
schema,
} => PhysicalPlan::HashAggregate {
input: Box::new(combine_adjacent_limits(*input)),
group_by,
aggregates,
schema,
},
leaf @ PhysicalPlan::SeqScan { .. } => leaf,
}
}
/// Execute a physical plan against the provided data source.
pub fn execute_physical(
plan: &PhysicalPlan,
source: &dyn DataSource,
) -> Result<ResultSet, ExecutionError> {
match plan {
PhysicalPlan::SeqScan { table, schema } => source.scan(table, schema),
PhysicalPlan::NestedLoopJoin {
left,
right,
schema,
} => {
let left_result = execute_physical(left, source)?;
let right_result = execute_physical(right, source)?;
let mut rows = Vec::new();
for left_row in left_result.rows() {
for right_row in right_result.rows() {
let mut values = left_row.values().to_vec();
values.extend_from_slice(right_row.values());
rows.push(Row::new(values));
}
}
Ok(ResultSet::new(schema.clone(), rows))
}
PhysicalPlan::Filter { input, predicate } => {
let result = execute_physical(input, source)?;
let filtered_rows = result
.rows()
.iter()
.filter(|row| eval_predicate(predicate, row, result.schema()).unwrap_or(false))
.cloned()
.collect();
Ok(ResultSet::new(result.schema().clone(), filtered_rows))
}
PhysicalPlan::Project {
input,
expressions,
schema,
} => {
let result = execute_physical(input, source)?;
let mut rows = Vec::new();
for row in result.rows() {
let values = expressions
.iter()
.map(|named| eval_expr(&named.expr, row, result.schema()))
.collect::<Result<Vec<_>, _>>()?;
rows.push(Row::new(values));
}
Ok(ResultSet::new(schema.clone(), rows))
}
PhysicalPlan::Sort {
input,
keys,
schema,
} => {
let result = execute_physical(input, source)?;
let mut rows = result.rows().to_vec();
let resolved = resolve_sort_keys(keys, result.schema())?;
rows.sort_by(|left, right| compare_rows(left, right, &resolved));
Ok(ResultSet::new(schema.clone(), rows))
}
PhysicalPlan::Limit { input, count } => {
let result = execute_physical(input, source)?;
let rows = result.rows().iter().take(*count).cloned().collect();
Ok(ResultSet::new(result.schema().clone(), rows))
}
PhysicalPlan::HashAggregate {
input,
group_by,
aggregates,
schema,
} => {
let result = execute_physical(input, source)?;
let rows = compute_aggregate(result.rows(), result.schema(), group_by, aggregates)?;
Ok(ResultSet::new(schema.clone(), rows))
}
}
}
fn eval_predicate(expr: &LogicalExpr, row: &Row, schema: &Schema) -> Result<bool, ExecutionError> {
match expr {
LogicalExpr::Eq(left, right) => Ok(eval_expr(left, row, schema)?
.sql_eq(&eval_expr(right, row, schema)?)
.unwrap_or(false)),
LogicalExpr::Ne(left, right) => Ok(eval_expr(left, row, schema)?
.sql_eq(&eval_expr(right, row, schema)?)
.map(|eq| !eq)
.unwrap_or(false)),
LogicalExpr::And(left, right) => {
Ok(eval_predicate(left, row, schema)? && eval_predicate(right, row, schema)?)
}
LogicalExpr::Or(left, right) => {
Ok(eval_predicate(left, row, schema)? || eval_predicate(right, row, schema)?)
}
_ => Ok(false),
}
}
fn eval_expr(expr: &LogicalExpr, row: &Row, schema: &Schema) -> Result<Value, ExecutionError> {
match expr {
LogicalExpr::Column(name) => {
let index = schema
.index_of(name)
.ok_or_else(|| ExecutionError::UnknownColumn(name.clone()))?;
Ok(row.get(index).cloned().unwrap_or(Value::Null))
}
LogicalExpr::Literal(value) => Ok(value.clone()),
LogicalExpr::Eq(left, right) => {
let left = eval_expr(left, row, schema)?;
let right = eval_expr(right, row, schema)?;
Ok(Value::Boolean(left.sql_eq(&right).unwrap_or(false)))
}
LogicalExpr::Ne(left, right) => {
let left = eval_expr(left, row, schema)?;
let right = eval_expr(right, row, schema)?;
Ok(Value::Boolean(
left.sql_eq(&right).map(|eq| !eq).unwrap_or(false),
))
}
LogicalExpr::And(left, right) => Ok(Value::Boolean(
eval_predicate(left, row, schema)? && eval_predicate(right, row, schema)?,
)),
LogicalExpr::Or(left, right) => Ok(Value::Boolean(
eval_predicate(left, row, schema)? || eval_predicate(right, row, schema)?,
)),
}
}
fn resolve_sort_keys(
keys: &[SortKey],
schema: &Schema,
) -> Result<Vec<(usize, SortDirection)>, ExecutionError> {
keys.iter()
.map(|key| {
let index = schema
.index_of(&key.column)
.ok_or_else(|| ExecutionError::UnknownColumn(key.column.clone()))?;
Ok((index, key.direction))
})
.collect()
}
fn compare_rows(left: &Row, right: &Row, keys: &[(usize, SortDirection)]) -> Ordering {
for (index, direction) in keys {
let left_value = left.get(*index).unwrap_or(&Value::Null);
let right_value = right.get(*index).unwrap_or(&Value::Null);
let ordering = compare_values(left_value, right_value);
if ordering != Ordering::Equal {
return match direction {
SortDirection::Asc => ordering,
SortDirection::Desc => ordering.reverse(),
};
}
}
Ordering::Equal
}
fn compare_values(left: &Value, right: &Value) -> Ordering {
match (left, right) {
(Value::Null, Value::Null) => Ordering::Equal,
(Value::Null, _) => Ordering::Greater,
(_, Value::Null) => Ordering::Less,
(Value::Text(left), Value::Text(right)) => left.cmp(right),
(Value::Integer(left), Value::Integer(right)) => left.cmp(right),
(Value::Boolean(left), Value::Boolean(right)) => left.cmp(right),
(Value::Integer(_), _) => Ordering::Less,
(_, Value::Integer(_)) => Ordering::Greater,
(Value::Text(_), Value::Boolean(_)) => Ordering::Less,
(Value::Boolean(_), Value::Text(_)) => Ordering::Greater,
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::chase::{Atom, Instance, Term};
use crate::planner::logical::{LogicalPlan, NamedExpr};
use crate::relational::{DataType, Field};
fn parent_instance() -> Instance {
vec![
Atom::new(
"Parent",
vec![Term::constant("alice"), Term::constant("bob")],
),
Atom::new(
"Parent",
vec![Term::constant("bob"), Term::constant("carol")],
),
]
.into_iter()
.collect()
}
fn parent_schema() -> Schema {
Schema::new(vec![
Field::new("c0", DataType::Text, false),
Field::new("c1", DataType::Text, false),
])
}
#[test]
fn plan_physical_mirrors_logical_scan() {
let logical = LogicalPlan::Scan {
table: "Parent".to_string(),
schema: parent_schema(),
};
let physical = plan_physical(&logical);
assert!(matches!(physical, PhysicalPlan::SeqScan { .. }));
}
#[test]
fn execute_physical_runs_scan_and_limit() {
let schema = parent_schema();
let logical = LogicalPlan::Limit {
input: Box::new(LogicalPlan::Scan {
table: "Parent".to_string(),
schema: schema.clone(),
}),
count: 1,
};
let physical = plan_physical(&logical);
let result = execute_physical(&physical, &parent_instance()).unwrap();
assert_eq!(result.rows().len(), 1);
}
#[test]
fn rewrite_collapses_adjacent_limits() {
let schema = parent_schema();
let nested = PhysicalPlan::Limit {
input: Box::new(PhysicalPlan::Limit {
input: Box::new(PhysicalPlan::SeqScan {
table: "Parent".to_string(),
schema,
}),
count: 5,
}),
count: 2,
};
let rewritten = rewrite_physical(nested);
match rewritten {
PhysicalPlan::Limit { input, count } => {
assert_eq!(count, 2);
assert!(matches!(*input, PhysicalPlan::SeqScan { .. }));
}
other => panic!("expected single Limit, got {:?}", other),
}
}
#[test]
fn execute_physical_handles_projection_and_filter() {
let schema = parent_schema();
let logical = LogicalPlan::Project {
input: Box::new(LogicalPlan::Filter {
input: Box::new(LogicalPlan::Scan {
table: "Parent".to_string(),
schema: schema.clone(),
}),
predicate: LogicalExpr::Eq(
Box::new(LogicalExpr::Column("c1".to_string())),
Box::new(LogicalExpr::Literal(Value::text("bob"))),
),
}),
expressions: vec![NamedExpr {
name: "c0".to_string(),
expr: LogicalExpr::Column("c0".to_string()),
}],
schema: Schema::new(vec![Field::new("c0", DataType::Text, false)]),
};
let physical = rewrite_physical(plan_physical(&logical));
let result = execute_physical(&physical, &parent_instance()).unwrap();
assert_eq!(result.rows().len(), 1);
assert_eq!(result.rows()[0].values()[0], Value::text("alice"));
}
#[test]
fn rewrite_pushes_single_side_filter_below_join() {
let left_schema = Schema::new(vec![
Field::new("Parent.parent", DataType::Text, false),
Field::new("Parent.child", DataType::Text, false),
]);
let right_schema = Schema::new(vec![
Field::new("Ancestor.parent", DataType::Text, false),
Field::new("Ancestor.child", DataType::Text, false),
]);
let join_schema = Schema::new(
left_schema
.fields()
.iter()
.chain(right_schema.fields())
.cloned()
.collect(),
);
// Filter(
// NestedLoopJoin(Parent, Ancestor),
// Parent.parent = 'alice' AND Parent.child = Ancestor.parent,
// )
let plan = PhysicalPlan::Filter {
input: Box::new(PhysicalPlan::NestedLoopJoin {
left: Box::new(PhysicalPlan::SeqScan {
table: "Parent".to_string(),
schema: left_schema,
}),
right: Box::new(PhysicalPlan::SeqScan {
table: "Ancestor".to_string(),
schema: right_schema,
}),
schema: join_schema,
}),
predicate: LogicalExpr::And(
Box::new(LogicalExpr::Eq(
Box::new(LogicalExpr::Column("Parent.parent".to_string())),
Box::new(LogicalExpr::Literal(Value::text("alice"))),
)),
Box::new(LogicalExpr::Eq(
Box::new(LogicalExpr::Column("Parent.child".to_string())),
Box::new(LogicalExpr::Column("Ancestor.parent".to_string())),
)),
),
};
let rewritten = rewrite_physical(plan);
match rewritten {
// The Parent.parent = 'alice' predicate should be pushed onto the
// left side; the join predicate should remain above.
PhysicalPlan::Filter { input, .. } => match *input {
PhysicalPlan::NestedLoopJoin { left, .. } => {
assert!(matches!(*left, PhysicalPlan::Filter { .. }));
}
other => panic!("expected NestedLoopJoin under Filter, got {:?}", other),
},
other => panic!("expected outer Filter, got {:?}", other),
}
}
#[test]
fn rewrite_push_filter_preserves_semantics_on_join() {
// Two three-row tables, join predicate filters down to one row.
// Push-down should not change the row count or values.
struct TwoTable;
impl DataSource for TwoTable {
fn scan(&self, table: &str, schema: &Schema) -> Result<ResultSet, ExecutionError> {
let rows = match table {
"L" => vec![
Row::new(vec![Value::text("alice"), Value::text("bob")]),
Row::new(vec![Value::text("bob"), Value::text("carol")]),
Row::new(vec![Value::text("carol"), Value::text("dave")]),
],
"R" => vec![
Row::new(vec![Value::text("bob"), Value::text("x")]),
Row::new(vec![Value::text("carol"), Value::text("y")]),
Row::new(vec![Value::text("eve"), Value::text("z")]),
],
_ => Vec::new(),
};
Ok(ResultSet::new(schema.clone(), rows))
}
}
let left_schema = Schema::new(vec![
Field::new("L.a", DataType::Text, false),
Field::new("L.b", DataType::Text, false),
]);
let right_schema = Schema::new(vec![
Field::new("R.a", DataType::Text, false),
Field::new("R.b", DataType::Text, false),
]);
let join_schema = Schema::new(
left_schema
.fields()
.iter()
.chain(right_schema.fields())
.cloned()
.collect(),
);
let plan = PhysicalPlan::Filter {
input: Box::new(PhysicalPlan::NestedLoopJoin {
left: Box::new(PhysicalPlan::SeqScan {
table: "L".to_string(),
schema: left_schema,
}),
right: Box::new(PhysicalPlan::SeqScan {
table: "R".to_string(),
schema: right_schema,
}),
schema: join_schema,
}),
predicate: LogicalExpr::And(
Box::new(LogicalExpr::Eq(
Box::new(LogicalExpr::Column("L.a".to_string())),
Box::new(LogicalExpr::Literal(Value::text("bob"))),
)),
Box::new(LogicalExpr::Eq(
Box::new(LogicalExpr::Column("L.b".to_string())),
Box::new(LogicalExpr::Column("R.a".to_string())),
)),
),
};
let before = execute_physical(&plan, &TwoTable).unwrap();
let after = execute_physical(&rewrite_physical(plan.clone()), &TwoTable).unwrap();
assert_eq!(before.rows().len(), after.rows().len());
assert_eq!(before.rows(), after.rows());
}
}

View File

@ -0,0 +1,72 @@
//! An in-memory table store backed by hash maps.
use std::collections::HashMap;
use crate::relational::{ResultSet, Row, Schema};
use super::{DataSource, ExecutionError};
/// A simple in-memory data source backed by named tables of rows.
///
/// Unlike [`Instance`](crate::chase::Instance), which stores chase-level atoms,
/// `TableStore` holds typed relational rows directly. This makes it useful for
/// testing and for scenarios where data does not originate from the chase engine.
#[derive(Debug, Clone, Default)]
pub struct TableStore {
tables: HashMap<String, (Schema, Vec<Row>)>,
}
impl TableStore {
/// Create an empty table store.
pub fn new() -> Self {
Self::default()
}
/// Register a table with its schema and initial rows.
pub fn insert(&mut self, name: impl Into<String>, schema: Schema, rows: Vec<Row>) {
self.tables.insert(name.into(), (schema, rows));
}
}
impl DataSource for TableStore {
fn scan(&self, table: &str, schema: &Schema) -> Result<ResultSet, ExecutionError> {
match self.tables.get(table) {
Some((_, rows)) => Ok(ResultSet::new(schema.clone(), rows.clone())),
None => Ok(ResultSet::new(schema.clone(), Vec::new())),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::relational::{DataType, Field, Value};
#[test]
fn scans_registered_table() {
let schema = Schema::new(vec![
Field::new("name", DataType::Text, false),
Field::new("age", DataType::Integer, false),
]);
let rows = vec![
Row::new(vec![Value::text("alice"), Value::Integer(30)]),
Row::new(vec![Value::text("bob"), Value::Integer(25)]),
];
let mut store = TableStore::new();
store.insert("people", schema.clone(), rows);
let result = store.scan("people", &schema).unwrap();
assert_eq!(result.rows().len(), 2);
assert_eq!(result.schema().fields()[0].name(), "name");
}
#[test]
fn returns_empty_for_missing_table() {
let store = TableStore::new();
let schema = Schema::new(vec![]);
let result = store.scan("missing", &schema).unwrap();
assert_eq!(result.rows().len(), 0);
}
}

View File

@ -18,5 +18,5 @@ pub mod sql;
// Lower-level reasoning and provenance APIs remain under `query_engine::chase`. // Lower-level reasoning and provenance APIs remain under `query_engine::chase`.
pub use chase::{ pub use chase::{
Atom, ChaseConfig, ChaseError, ChaseResult, ChaseVariant, Instance, Rule, RuleBuilder, Term, Atom, ChaseConfig, ChaseError, ChaseResult, ChaseVariant, Instance, Rule, RuleBuilder, Term,
chase, chase_with_config, oblivious_chase, standard_chase, chase, chase_with_config, oblivious_chase, skolem_chase, standard_chase,
}; };

View File

@ -1,4 +1,5 @@
use crate::relational::{Schema, Value}; use crate::relational::{Schema, Value};
use crate::sql::ast::AggregateFunc;
/// Sort direction for the logical `Sort` operator. /// Sort direction for the logical `Sort` operator.
#[derive(Debug, Clone, Copy, PartialEq, Eq)] #[derive(Debug, Clone, Copy, PartialEq, Eq)]
@ -44,6 +45,17 @@ pub struct SortKey {
pub direction: SortDirection, pub direction: SortDirection,
} }
/// A single aggregate output in a logical `Aggregate` operator.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AggregateExpr {
/// Output column name for this aggregate.
pub name: String,
/// Aggregate function to apply.
pub func: AggregateFunc,
/// Source column name for the aggregate input, or `None` for `COUNT(*)`.
pub arg: Option<String>,
}
/// A logical plan in the current execution subset. /// A logical plan in the current execution subset.
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
pub enum LogicalPlan { pub enum LogicalPlan {
@ -60,6 +72,14 @@ pub enum LogicalPlan {
input: Box<LogicalPlan>, input: Box<LogicalPlan>,
predicate: LogicalExpr, predicate: LogicalExpr,
}, },
/// Group rows by a list of columns and compute aggregates per group.
/// The output schema is `group_by` columns followed by aggregate outputs.
Aggregate {
input: Box<LogicalPlan>,
group_by: Vec<String>,
aggregates: Vec<AggregateExpr>,
schema: Schema,
},
/// Sort rows by one or more output columns. /// Sort rows by one or more output columns.
Sort { Sort {
input: Box<LogicalPlan>, input: Box<LogicalPlan>,
@ -86,6 +106,7 @@ impl LogicalPlan {
Self::Scan { schema, .. } => schema, Self::Scan { schema, .. } => schema,
Self::CrossJoin { schema, .. } => schema, Self::CrossJoin { schema, .. } => schema,
Self::Filter { input, .. } => input.output_schema(), Self::Filter { input, .. } => input.output_schema(),
Self::Aggregate { schema, .. } => schema,
Self::Sort { schema, .. } => schema, Self::Sort { schema, .. } => schema,
Self::Project { schema, .. } => schema, Self::Project { schema, .. } => schema,
Self::Limit { input, .. } => input.output_schema(), Self::Limit { input, .. } => input.output_schema(),

View File

@ -4,11 +4,13 @@ use std::fmt;
use crate::catalog::{CatalogError, PredicateCatalog}; use crate::catalog::{CatalogError, PredicateCatalog};
use crate::planner::logical::{ use crate::planner::logical::{
LogicalExpr, LogicalPlan, NamedExpr, SortDirection as LogicalSortDirection, SortKey, AggregateExpr as PlanAggregateExpr, LogicalExpr, LogicalPlan, NamedExpr,
SortDirection as LogicalSortDirection, SortKey,
}; };
use crate::relational::{DataType, Field, Schema, Value}; use crate::relational::{DataType, Field, Schema, Value};
use crate::sql::ast::{ use crate::sql::ast::{
BinaryOp, Expr, Literal, OrderByItem, Select, SelectItem, SortDirection, TableRef, AggregateArg, AggregateFunc, BinaryOp, Expr, Literal, OrderByItem, Select, SelectItem,
SortDirection, TableRef,
}; };
/// Errors returned when translating SQL AST into a logical plan. /// Errors returned when translating SQL AST into a logical plan.
@ -24,6 +26,14 @@ pub enum PlannerError {
UnsupportedOrderBy, UnsupportedOrderBy,
/// The parser or AST contains a wildcard mixed with other projection items. /// The parser or AST contains a wildcard mixed with other projection items.
MixedWildcardProjection, MixedWildcardProjection,
/// A `GROUP BY` expression is not a simple column reference.
UnsupportedGroupBy,
/// A projected column is neither aggregated nor present in `GROUP BY`.
ProjectionNotGrouped(String),
/// An aggregate expression appears in an unsupported position.
UnsupportedAggregate,
/// `COUNT(*)` was used with a non-count aggregate function.
StarArgNotAllowed,
} }
impl fmt::Display for PlannerError { impl fmt::Display for PlannerError {
@ -43,6 +53,18 @@ impl fmt::Display for PlannerError {
"wildcard projections cannot be combined with other items" "wildcard projections cannot be combined with other items"
) )
} }
Self::UnsupportedGroupBy => {
write!(f, "only bare column references are supported in GROUP BY")
}
Self::ProjectionNotGrouped(name) => {
write!(f, "column `{}` is not aggregated and not in GROUP BY", name)
}
Self::UnsupportedAggregate => {
write!(f, "aggregate expressions are only allowed in SELECT items")
}
Self::StarArgNotAllowed => {
write!(f, "`*` is only allowed as the argument to COUNT")
}
} }
} }
} }
@ -54,7 +76,11 @@ impl Error for PlannerError {
Self::UnknownColumn(_) Self::UnknownColumn(_)
| Self::DuplicateSourceName(_) | Self::DuplicateSourceName(_)
| Self::UnsupportedOrderBy | Self::UnsupportedOrderBy
| Self::MixedWildcardProjection => None, | Self::MixedWildcardProjection
| Self::UnsupportedGroupBy
| Self::ProjectionNotGrouped(_)
| Self::UnsupportedAggregate
| Self::StarArgNotAllowed => None,
} }
} }
} }
@ -80,7 +106,15 @@ pub fn plan_select(
}; };
} }
if !is_wildcard_projection(&select.projection) { let is_aggregate_query = !select.group_by.is_empty()
|| select.projection.iter().any(|item| match item {
SelectItem::Expr { expr, .. } => contains_aggregate(expr),
SelectItem::Wildcard => false,
});
if is_aggregate_query {
plan = plan_aggregate(plan, &input_schema, select)?;
} else if !is_wildcard_projection(&select.projection) {
let mut expressions = Vec::new(); let mut expressions = Vec::new();
let mut fields = Vec::new(); let mut fields = Vec::new();
for (index, item) in select.projection.iter().enumerate() { for (index, item) in select.projection.iter().enumerate() {
@ -122,6 +156,208 @@ pub fn plan_select(
Ok(plan) Ok(plan)
} }
fn contains_aggregate(expr: &Expr) -> bool {
match expr {
Expr::Aggregate { .. } => true,
Expr::Binary { left, right, .. } => contains_aggregate(left) || contains_aggregate(right),
Expr::Identifier(_) | Expr::Literal(_) => false,
}
}
fn plan_aggregate(
input: LogicalPlan,
input_schema: &Schema,
select: &Select,
) -> Result<LogicalPlan, PlannerError> {
// Resolve GROUP BY expressions to column names.
let mut group_by_cols = Vec::new();
for expr in &select.group_by {
match expr {
Expr::Identifier(name) => {
let resolved = resolve_column_name(name, input_schema, &select.from)?;
group_by_cols.push(resolved);
}
_ => return Err(PlannerError::UnsupportedGroupBy),
}
}
// Walk the projection, collecting aggregate expressions and verifying
// non-aggregate column references are in GROUP BY.
let mut aggregates: Vec<PlanAggregateExpr> = Vec::new();
let mut projection_items: Vec<(String, ProjectionSource)> = Vec::new();
for (index, item) in select.projection.iter().enumerate() {
match item {
SelectItem::Wildcard => return Err(PlannerError::MixedWildcardProjection),
SelectItem::Expr { expr, alias } => {
let output_name = alias
.clone()
.unwrap_or_else(|| default_projection_name(expr, index + 1));
let source = plan_aggregate_projection(
expr,
input_schema,
select,
&group_by_cols,
&mut aggregates,
)?;
projection_items.push((output_name, source));
}
}
}
// Build the Aggregate node's output schema: group_by columns followed by
// aggregate outputs.
let mut agg_fields = Vec::new();
for col in &group_by_cols {
let field_index = input_schema
.index_of(col)
.ok_or_else(|| PlannerError::UnknownColumn(col.clone()))?;
let field = &input_schema.fields()[field_index];
agg_fields.push(Field::new(
col.clone(),
field.data_type().clone(),
field.nullable(),
));
}
for agg in &aggregates {
let (dtype, nullable) = aggregate_output_type(agg, input_schema)?;
agg_fields.push(Field::new(agg.name.clone(), dtype, nullable));
}
let agg_schema = Schema::new(agg_fields);
let aggregate_plan = LogicalPlan::Aggregate {
input: Box::new(input),
group_by: group_by_cols.clone(),
aggregates,
schema: agg_schema.clone(),
};
// Build the final Project over the aggregate output.
let mut expressions = Vec::new();
let mut fields = Vec::new();
for (name, source) in projection_items {
let (expr, dtype, nullable) = match source {
ProjectionSource::GroupColumn(col) => {
let index = agg_schema
.index_of(&col)
.ok_or_else(|| PlannerError::UnknownColumn(col.clone()))?;
let field = &agg_schema.fields()[index];
(
LogicalExpr::Column(col),
field.data_type().clone(),
field.nullable(),
)
}
ProjectionSource::AggregateColumn(col) => {
let index = agg_schema
.index_of(&col)
.ok_or_else(|| PlannerError::UnknownColumn(col.clone()))?;
let field = &agg_schema.fields()[index];
(
LogicalExpr::Column(col),
field.data_type().clone(),
field.nullable(),
)
}
ProjectionSource::Literal(value) => {
let (dtype, nullable) = literal_metadata(&value);
(LogicalExpr::Literal(value), dtype, nullable)
}
};
expressions.push(NamedExpr {
name: name.clone(),
expr,
});
fields.push(Field::new(name, dtype, nullable));
}
Ok(LogicalPlan::Project {
input: Box::new(aggregate_plan),
expressions,
schema: Schema::new(fields),
})
}
#[derive(Debug, Clone)]
enum ProjectionSource {
GroupColumn(String),
AggregateColumn(String),
Literal(Value),
}
fn plan_aggregate_projection(
expr: &Expr,
input_schema: &Schema,
select: &Select,
group_by_cols: &[String],
aggregates: &mut Vec<PlanAggregateExpr>,
) -> Result<ProjectionSource, PlannerError> {
match expr {
Expr::Aggregate { func, arg } => {
let arg_col = match arg {
AggregateArg::Star => {
if !matches!(func, AggregateFunc::Count) {
return Err(PlannerError::StarArgNotAllowed);
}
None
}
AggregateArg::Expr(inner) => match inner.as_ref() {
Expr::Identifier(name) => {
Some(resolve_column_name(name, input_schema, &select.from)?)
}
_ => return Err(PlannerError::UnsupportedAggregate),
},
};
let synthetic_name = format!("__agg_{}", aggregates.len());
aggregates.push(PlanAggregateExpr {
name: synthetic_name.clone(),
func: *func,
arg: arg_col,
});
Ok(ProjectionSource::AggregateColumn(synthetic_name))
}
Expr::Identifier(name) => {
let resolved = resolve_column_name(name, input_schema, &select.from)?;
if !group_by_cols.contains(&resolved) {
return Err(PlannerError::ProjectionNotGrouped(name.clone()));
}
Ok(ProjectionSource::GroupColumn(resolved))
}
Expr::Literal(literal) => Ok(ProjectionSource::Literal(plan_literal(literal))),
Expr::Binary { .. } => Err(PlannerError::UnsupportedAggregate),
}
}
fn aggregate_output_type(
agg: &PlanAggregateExpr,
input_schema: &Schema,
) -> Result<(DataType, bool), PlannerError> {
match agg.func {
AggregateFunc::Count => Ok((DataType::Integer, false)),
AggregateFunc::Sum | AggregateFunc::Avg => Ok((DataType::Integer, true)),
AggregateFunc::Min | AggregateFunc::Max => {
if let Some(col) = &agg.arg {
let index = input_schema
.index_of(col)
.ok_or_else(|| PlannerError::UnknownColumn(col.clone()))?;
let field = &input_schema.fields()[index];
Ok((field.data_type().clone(), true))
} else {
Ok((DataType::Text, true))
}
}
}
}
fn literal_metadata(value: &Value) -> (DataType, bool) {
match value {
Value::Text(_) => (DataType::Text, false),
Value::Integer(_) => (DataType::Integer, false),
Value::Boolean(_) => (DataType::Boolean, false),
Value::Null => (DataType::Text, true),
}
}
fn is_wildcard_projection(items: &[SelectItem]) -> bool { fn is_wildcard_projection(items: &[SelectItem]) -> bool {
matches!(items, [SelectItem::Wildcard]) matches!(items, [SelectItem::Wildcard])
} }
@ -202,6 +438,7 @@ fn plan_expr(
Box::new(plan_expr(right, schema, tables)?), Box::new(plan_expr(right, schema, tables)?),
)), )),
}, },
Expr::Aggregate { .. } => Err(PlannerError::UnsupportedAggregate),
} }
} }
@ -264,6 +501,7 @@ fn projection_metadata(
Expr::Literal(Literal::Integer(_)) => Ok((DataType::Integer, false)), Expr::Literal(Literal::Integer(_)) => Ok((DataType::Integer, false)),
Expr::Literal(Literal::Null) => Ok((DataType::Text, true)), Expr::Literal(Literal::Null) => Ok((DataType::Text, true)),
Expr::Binary { .. } => Ok((DataType::Boolean, true)), Expr::Binary { .. } => Ok((DataType::Boolean, true)),
Expr::Aggregate { .. } => Err(PlannerError::UnsupportedAggregate),
} }
} }
@ -290,6 +528,23 @@ fn resolve_column_name(
fn default_projection_name(expr: &Expr, ordinal: usize) -> String { fn default_projection_name(expr: &Expr, ordinal: usize) -> String {
match expr { match expr {
Expr::Aggregate { func, arg } => {
let func_name = match func {
AggregateFunc::Count => "COUNT",
AggregateFunc::Sum => "SUM",
AggregateFunc::Min => "MIN",
AggregateFunc::Max => "MAX",
AggregateFunc::Avg => "AVG",
};
let arg_str = match arg {
AggregateArg::Star => "*".to_string(),
AggregateArg::Expr(inner) => match inner.as_ref() {
Expr::Identifier(name) => name.clone(),
_ => format!("expr{}", ordinal),
},
};
format!("{}({})", func_name, arg_str)
}
Expr::Identifier(name) => name.clone(), Expr::Identifier(name) => name.clone(),
Expr::Literal(_) | Expr::Binary { .. } => format!("expr{}", ordinal), Expr::Literal(_) | Expr::Binary { .. } => format!("expr{}", ordinal),
} }
@ -566,6 +821,7 @@ mod tests {
alias: None, alias: None,
}], }],
selection: None, selection: None,
group_by: Vec::new(),
order_by: Vec::new(), order_by: Vec::new(),
limit: None, limit: None,
}; };

View File

@ -1,4 +1,5 @@
/// A parsed `SELECT-FROM-WHERE-ORDER BY-LIMIT` statement in the current SQL subset. /// A parsed `SELECT-FROM-WHERE-GROUP BY-ORDER BY-LIMIT` statement in the
/// current SQL subset.
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
pub struct Select { pub struct Select {
/// Output expressions requested by the query. /// Output expressions requested by the query.
@ -7,6 +8,8 @@ pub struct Select {
pub from: Vec<TableRef>, pub from: Vec<TableRef>,
/// Optional filter predicate. /// Optional filter predicate.
pub selection: Option<Expr>, pub selection: Option<Expr>,
/// Grouping columns. Empty means no `GROUP BY` clause.
pub group_by: Vec<Expr>,
/// Optional output ordering. /// Optional output ordering.
pub order_by: Vec<OrderByItem>, pub order_by: Vec<OrderByItem>,
/// Optional row limit. /// Optional row limit.
@ -53,6 +56,36 @@ pub enum Expr {
op: BinaryOp, op: BinaryOp,
right: Box<Expr>, right: Box<Expr>,
}, },
/// An aggregate function applied to an argument.
Aggregate {
func: AggregateFunc,
arg: AggregateArg,
},
}
/// An aggregate function in the current SQL subset.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AggregateFunc {
/// Row count (with `*`) or count of non-null values (with a column).
Count,
/// Sum of integer values.
Sum,
/// Minimum value.
Min,
/// Maximum value.
Max,
/// Arithmetic mean of integer values.
Avg,
}
/// The argument to an aggregate function: either `*` (only valid for
/// `COUNT`) or an expression.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum AggregateArg {
/// `COUNT(*)` style argument.
Star,
/// An expression argument such as `SUM(col)`.
Expr(Box<Expr>),
} }
/// A SQL literal in the current subset. /// A SQL literal in the current subset.

View File

@ -2,7 +2,8 @@ use std::error::Error;
use std::fmt; use std::fmt;
use super::ast::{ use super::ast::{
BinaryOp, Expr, Literal, OrderByItem, Select, SelectItem, SortDirection, TableRef, AggregateArg, AggregateFunc, BinaryOp, Expr, Literal, OrderByItem, Select, SelectItem,
SortDirection, TableRef,
}; };
/// Errors returned by the minimal SQL parser. /// Errors returned by the minimal SQL parser.
@ -50,11 +51,14 @@ enum Token {
Desc, Desc,
Null, Null,
Limit, Limit,
Group,
Identifier(String), Identifier(String),
String(String), String(String),
Integer(usize), Integer(usize),
Star, Star,
Comma, Comma,
LParen,
RParen,
Eq, Eq,
Ne, Ne,
} }
@ -87,6 +91,13 @@ impl Parser {
} else { } else {
None None
}; };
let group_by = if self.peek() == Some(&Token::Group) {
self.index += 1;
self.expect_keyword(Token::By, "BY")?;
self.parse_group_by()?
} else {
Vec::new()
};
let order_by = if self.peek() == Some(&Token::Order) { let order_by = if self.peek() == Some(&Token::Order) {
self.index += 1; self.index += 1;
self.expect_keyword(Token::By, "BY")?; self.expect_keyword(Token::By, "BY")?;
@ -110,11 +121,25 @@ impl Parser {
projection, projection,
from, from,
selection, selection,
group_by,
order_by, order_by,
limit, limit,
}) })
} }
fn parse_group_by(&mut self) -> Result<Vec<Expr>, ParseError> {
let mut items = Vec::new();
loop {
items.push(self.parse_operand()?);
if self.peek() == Some(&Token::Comma) {
self.index += 1;
continue;
}
break;
}
Ok(items)
}
fn parse_projection(&mut self) -> Result<Vec<SelectItem>, ParseError> { fn parse_projection(&mut self) -> Result<Vec<SelectItem>, ParseError> {
let mut items = Vec::new(); let mut items = Vec::new();
@ -262,7 +287,13 @@ impl Parser {
fn parse_operand(&mut self) -> Result<Expr, ParseError> { fn parse_operand(&mut self) -> Result<Expr, ParseError> {
match self.next().ok_or(ParseError::UnexpectedEnd)? { match self.next().ok_or(ParseError::UnexpectedEnd)? {
Token::Identifier(name) => Ok(Expr::Identifier(name)), Token::Identifier(name) => {
if self.peek() == Some(&Token::LParen) {
self.parse_function_call(name)
} else {
Ok(Expr::Identifier(name))
}
}
Token::String(value) => Ok(Expr::Literal(Literal::String(value))), Token::String(value) => Ok(Expr::Literal(Literal::String(value))),
Token::Integer(n) => Ok(Expr::Literal(Literal::Integer(n as i64))), Token::Integer(n) => Ok(Expr::Literal(Literal::Integer(n as i64))),
Token::Null => Ok(Expr::Literal(Literal::Null)), Token::Null => Ok(Expr::Literal(Literal::Null)),
@ -270,6 +301,31 @@ impl Parser {
} }
} }
fn parse_function_call(&mut self, name: String) -> Result<Expr, ParseError> {
self.expect_keyword(Token::LParen, "(")?;
let func = match name.to_ascii_uppercase().as_str() {
"COUNT" => AggregateFunc::Count,
"SUM" => AggregateFunc::Sum,
"MIN" => AggregateFunc::Min,
"MAX" => AggregateFunc::Max,
"AVG" => AggregateFunc::Avg,
_ => return Err(ParseError::UnexpectedToken(name)),
};
let arg = if self.peek() == Some(&Token::Star) {
self.index += 1;
if !matches!(func, AggregateFunc::Count) {
return Err(ParseError::UnexpectedToken("*".to_string()));
}
AggregateArg::Star
} else {
AggregateArg::Expr(Box::new(self.parse_operand()?))
};
self.expect_keyword(Token::RParen, ")")?;
Ok(Expr::Aggregate { func, arg })
}
fn expect_keyword(&mut self, token: Token, label: &'static str) -> Result<(), ParseError> { fn expect_keyword(&mut self, token: Token, label: &'static str) -> Result<(), ParseError> {
let next = self.next().ok_or(ParseError::UnexpectedEnd)?; let next = self.next().ok_or(ParseError::UnexpectedEnd)?;
if next == token { if next == token {
@ -325,6 +381,14 @@ fn tokenize(input: &str) -> Result<Vec<Token>, ParseError> {
chars.next(); chars.next();
tokens.push(Token::Comma); tokens.push(Token::Comma);
} }
'(' => {
chars.next();
tokens.push(Token::LParen);
}
')' => {
chars.next();
tokens.push(Token::RParen);
}
'!' => { '!' => {
chars.next(); chars.next();
if chars.peek() == Some(&'=') { if chars.peek() == Some(&'=') {
@ -367,6 +431,7 @@ fn tokenize(input: &str) -> Result<Vec<Token>, ParseError> {
"DESC" => Token::Desc, "DESC" => Token::Desc,
"NULL" => Token::Null, "NULL" => Token::Null,
"LIMIT" => Token::Limit, "LIMIT" => Token::Limit,
"GROUP" => Token::Group,
_ => Token::Identifier(ident), _ => Token::Identifier(ident),
}; };
tokens.push(token); tokens.push(token);
@ -462,6 +527,9 @@ fn render_token(token: &Token) -> String {
Token::String(value) => format!("'{}'", value), Token::String(value) => format!("'{}'", value),
Token::Star => "*".to_string(), Token::Star => "*".to_string(),
Token::Comma => ",".to_string(), Token::Comma => ",".to_string(),
Token::LParen => "(".to_string(),
Token::RParen => ")".to_string(),
Token::Group => "GROUP".to_string(),
Token::Eq => "=".to_string(), Token::Eq => "=".to_string(),
Token::Ne => "!=".to_string(), Token::Ne => "!=".to_string(),
} }

View File

@ -360,3 +360,156 @@ fn execute_with_table_store_scans_in_memory_rows() {
assert_eq!(result.rows().len(), 1); assert_eq!(result.rows().len(), 1);
assert_eq!(format!("{}", result.rows()[0].values()[0]), "bob"); assert_eq!(format!("{}", result.rows()[0].values()[0]), "bob");
} }
#[test]
fn count_star_no_group_by() {
let instance = parent_instance();
let catalog = PredicateCatalog::from_instance(&instance).unwrap();
let select = parse_select("SELECT COUNT(*) FROM Parent").unwrap();
let plan = plan_select(&select, &catalog).unwrap();
let result = execute(&plan, &instance).unwrap();
assert_eq!(result.rows().len(), 1);
assert_eq!(format!("{}", result.rows()[0].values()[0]), "2");
}
#[test]
fn count_star_group_by_one_column() {
use query_engine::execution::TableStore;
use query_engine::relational::{DataType, Field, Row, Schema, Value};
let schema = Schema::new(vec![
Field::new("dept", DataType::Text, false),
Field::new("name", DataType::Text, false),
]);
let mut store = TableStore::new();
store.insert(
"Emp",
schema.clone(),
vec![
Row::new(vec![Value::text("eng"), Value::text("alice")]),
Row::new(vec![Value::text("eng"), Value::text("bob")]),
Row::new(vec![Value::text("sales"), Value::text("carol")]),
],
);
let mut catalog = PredicateCatalog::new();
catalog.register_table("Emp", schema);
let select = parse_select("SELECT dept, COUNT(*) FROM Emp GROUP BY dept").unwrap();
let plan = plan_select(&select, &catalog).unwrap();
let result = execute(&plan, &store).unwrap();
assert_eq!(result.rows().len(), 2);
let mut rows: Vec<(String, String)> = result
.rows()
.iter()
.map(|row| {
(
format!("{}", row.values()[0]),
format!("{}", row.values()[1]),
)
})
.collect();
rows.sort();
assert_eq!(
rows,
vec![
("eng".to_string(), "2".to_string()),
("sales".to_string(), "1".to_string()),
]
);
}
#[test]
fn sum_min_max_avg_over_integer_column() {
use query_engine::execution::TableStore;
use query_engine::relational::{DataType, Field, Row, Schema, Value};
let schema = Schema::new(vec![
Field::new("dept", DataType::Text, false),
Field::new("salary", DataType::Integer, false),
]);
let mut store = TableStore::new();
store.insert(
"Emp",
schema.clone(),
vec![
Row::new(vec![Value::text("eng"), Value::Integer(100)]),
Row::new(vec![Value::text("eng"), Value::Integer(200)]),
Row::new(vec![Value::text("sales"), Value::Integer(50)]),
],
);
let mut catalog = PredicateCatalog::new();
catalog.register_table("Emp", schema);
let select = parse_select(
"SELECT dept, SUM(salary), MIN(salary), MAX(salary), AVG(salary) FROM Emp GROUP BY dept",
)
.unwrap();
let plan = plan_select(&select, &catalog).unwrap();
let result = execute(&plan, &store).unwrap();
assert_eq!(result.rows().len(), 2);
let mut rows: Vec<(String, String, String, String, String)> = result
.rows()
.iter()
.map(|row| {
(
format!("{}", row.values()[0]),
format!("{}", row.values()[1]),
format!("{}", row.values()[2]),
format!("{}", row.values()[3]),
format!("{}", row.values()[4]),
)
})
.collect();
rows.sort();
assert_eq!(
rows[0],
(
"eng".to_string(),
"300".to_string(),
"100".to_string(),
"200".to_string(),
"150".to_string(),
)
);
assert_eq!(
rows[1],
(
"sales".to_string(),
"50".to_string(),
"50".to_string(),
"50".to_string(),
"50".to_string(),
)
);
}
#[test]
fn projection_not_in_group_by_errors() {
use query_engine::execution::TableStore;
use query_engine::relational::{DataType, Field, Schema};
let schema = Schema::new(vec![
Field::new("dept", DataType::Text, false),
Field::new("name", DataType::Text, false),
]);
let mut store = TableStore::new();
store.insert("Emp", schema.clone(), Vec::new());
let mut catalog = PredicateCatalog::new();
catalog.register_table("Emp", schema);
let select = parse_select("SELECT dept, name FROM Emp GROUP BY dept").unwrap();
let err = plan_select(&select, &catalog).unwrap_err();
assert!(
err.to_string()
.contains("not aggregated and not in GROUP BY")
);
}