//! Execution support for the current SQL slice. //! //! The executor evaluates a [`LogicalPlan`] against a [`DataSource`] that //! provides table scans. The built-in [`Instance`](crate::chase::Instance) //! adapter and the [`TableStore`] are the two provided implementations. pub mod physical; pub mod table_store; use std::cmp::Ordering; use std::error::Error; use std::fmt; use crate::chase::{Instance, Term}; use crate::planner::logical::{ AggregateExpr as PlanAggregateExpr, LogicalExpr, LogicalPlan, SortDirection, SortKey, }; use crate::relational::{ResultSet, Row, Schema, Value}; use crate::sql::ast::AggregateFunc; pub use physical::{ NamedPhysicalExpr, PhysicalPlan, execute_physical, plan_physical, rewrite_physical, }; pub use table_store::TableStore; /// Errors returned by the current logical-plan executor. #[derive(Debug)] pub enum ExecutionError { /// A column reference could not be resolved. UnknownColumn(String), /// The scan layer encountered a variable term where a ground value was expected. NonGroundScanTerm, } impl fmt::Display for ExecutionError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { Self::UnknownColumn(column) => write!(f, "unknown column `{}`", column), Self::NonGroundScanTerm => { write!(f, "cannot scan non-ground terms into relational rows") } } } } impl Error for ExecutionError {} /// A source of relational data for the executor. /// /// Implementations provide table scans that return rows conforming to a given /// schema. The executor calls [`scan`](DataSource::scan) for each /// [`LogicalPlan::Scan`] node; all other operators work on the resulting /// [`ResultSet`] values. pub trait DataSource { /// Scan all rows for the named table, conforming to the provided schema. fn scan(&self, table: &str, schema: &Schema) -> Result; } impl DataSource for Instance { fn scan(&self, table: &str, schema: &Schema) -> Result { let mut rows = Vec::new(); for fact in self.facts_for_predicate(table) { let values = fact .terms .iter() .map(value_from_term) .collect::, _>>()?; rows.push(Row::new(values)); } Ok(ResultSet::new(schema.clone(), rows)) } } /// Execute a logical plan against the provided data source. pub fn execute(plan: &LogicalPlan, source: &dyn DataSource) -> Result { match plan { LogicalPlan::Scan { table, schema } => source.scan(table, schema), LogicalPlan::CrossJoin { left, right, schema, } => { let left_result = execute(left, source)?; let right_result = execute(right, source)?; let mut rows = Vec::new(); for left_row in left_result.rows() { for right_row in right_result.rows() { let mut values = left_row.values().to_vec(); values.extend_from_slice(right_row.values()); rows.push(Row::new(values)); } } Ok(ResultSet::new(schema.clone(), rows)) } LogicalPlan::Filter { input, predicate } => { let result = execute(input, source)?; let filtered_rows = result .rows() .iter() .filter(|row| eval_predicate(predicate, row, result.schema()).unwrap_or(false)) .cloned() .collect(); Ok(ResultSet::new(result.schema().clone(), filtered_rows)) } LogicalPlan::Project { input, expressions, schema, } => { let result = execute(input, source)?; let mut rows = Vec::new(); for row in result.rows() { let values = expressions .iter() .map(|expr| eval_expr(&expr.expr, row, result.schema())) .collect::, _>>()?; rows.push(Row::new(values)); } Ok(ResultSet::new(schema.clone(), rows)) } LogicalPlan::Sort { input, keys, schema, } => { let result = execute(input, source)?; let mut rows = result.rows().to_vec(); let resolved_keys = resolve_sort_keys(keys, result.schema())?; rows.sort_by(|left, right| compare_rows(left, right, &resolved_keys)); Ok(ResultSet::new(schema.clone(), rows)) } LogicalPlan::Limit { input, count } => { let result = execute(input, source)?; let rows = result.rows().iter().take(*count).cloned().collect(); Ok(ResultSet::new(result.schema().clone(), rows)) } LogicalPlan::Aggregate { input, group_by, aggregates, schema, } => { let result = execute(input, source)?; let rows = compute_aggregate(result.rows(), result.schema(), group_by, aggregates)?; Ok(ResultSet::new(schema.clone(), rows)) } } } /// Evaluate group-by + aggregates over a row set, returning one output row /// per distinct group key. The output row layout is: group_by column values /// followed by aggregate output values. pub(crate) fn compute_aggregate( rows: &[Row], input_schema: &Schema, group_by: &[String], aggregates: &[PlanAggregateExpr], ) -> Result, ExecutionError> { let group_indexes = group_by .iter() .map(|name| { input_schema .index_of(name) .ok_or_else(|| ExecutionError::UnknownColumn(name.clone())) }) .collect::, _>>()?; // Each aggregate holds an optional input column index (None means COUNT(*)). let agg_indexes = aggregates .iter() .map(|agg| { agg.arg .as_ref() .map(|col| { input_schema .index_of(col) .ok_or_else(|| ExecutionError::UnknownColumn(col.clone())) }) .transpose() }) .collect::>, _>>()?; // Preserve first-seen group order so single-group output is deterministic. let mut order: Vec> = Vec::new(); let mut groups: std::collections::HashMap, Vec> = std::collections::HashMap::new(); for row in rows { let key: Vec = group_indexes .iter() .map(|i| row.get(*i).cloned().unwrap_or(Value::Null)) .collect(); let states = groups.entry(key.clone()).or_insert_with(|| { order.push(key.clone()); aggregates .iter() .map(|agg| AggregateState::new(agg.func)) .collect() }); for (state, index_opt) in states.iter_mut().zip(agg_indexes.iter()) { let value = match index_opt { Some(i) => row.get(*i).cloned().unwrap_or(Value::Null), None => Value::Null, // COUNT(*) observes each row }; state.observe(&value, index_opt.is_none()); } } // If the user wrote an aggregate with no GROUP BY and no input rows, we // still need one output row (all-null plus zero counts). if rows.is_empty() && group_by.is_empty() && !aggregates.is_empty() { order.push(Vec::new()); groups.insert( Vec::new(), aggregates .iter() .map(|agg| AggregateState::new(agg.func)) .collect(), ); } let mut out_rows = Vec::new(); for key in order { let states = groups.remove(&key).unwrap_or_default(); let mut values = key; for state in &states { values.push(state.finalize()); } out_rows.push(Row::new(values)); } Ok(out_rows) } #[derive(Debug)] pub(crate) enum AggregateState { Count(i64), Sum(Option), Min(Option), Max(Option), Avg { sum: i64, count: i64 }, } impl AggregateState { pub(crate) fn new(func: AggregateFunc) -> Self { match func { AggregateFunc::Count => Self::Count(0), AggregateFunc::Sum => Self::Sum(None), AggregateFunc::Min => Self::Min(None), AggregateFunc::Max => Self::Max(None), AggregateFunc::Avg => Self::Avg { sum: 0, count: 0 }, } } pub(crate) fn observe(&mut self, value: &Value, is_count_star: bool) { match self { Self::Count(c) => { if is_count_star || !matches!(value, Value::Null) { *c += 1; } } Self::Sum(total) => { if let Value::Integer(n) = value { *total = Some(total.unwrap_or(0) + n); } } Self::Min(current) => { if !matches!(value, Value::Null) { match current { None => *current = Some(value.clone()), Some(existing) => { if compare_values_for_agg(value, existing) == std::cmp::Ordering::Less { *existing = value.clone(); } } } } } Self::Max(current) => { if !matches!(value, Value::Null) { match current { None => *current = Some(value.clone()), Some(existing) => { if compare_values_for_agg(value, existing) == std::cmp::Ordering::Greater { *existing = value.clone(); } } } } } Self::Avg { sum, count } => { if let Value::Integer(n) = value { *sum += n; *count += 1; } } } } pub(crate) fn finalize(&self) -> Value { match self { Self::Count(c) => Value::Integer(*c), Self::Sum(total) => total.map(Value::Integer).unwrap_or(Value::Null), Self::Min(v) | Self::Max(v) => v.clone().unwrap_or(Value::Null), Self::Avg { sum, count } => { if *count == 0 { Value::Null } else { Value::Integer(sum / count) } } } } } fn compare_values_for_agg(left: &Value, right: &Value) -> std::cmp::Ordering { match (left, right) { (Value::Integer(a), Value::Integer(b)) => a.cmp(b), (Value::Text(a), Value::Text(b)) => a.cmp(b), (Value::Boolean(a), Value::Boolean(b)) => a.cmp(b), _ => std::cmp::Ordering::Equal, } } fn eval_predicate( expr: &LogicalExpr, row: &Row, schema: &crate::relational::Schema, ) -> Result { match expr { LogicalExpr::Eq(left, right) => Ok(eval_expr(left, row, schema)? .sql_eq(&eval_expr(right, row, schema)?) .unwrap_or(false)), LogicalExpr::Ne(left, right) => Ok(eval_expr(left, row, schema)? .sql_eq(&eval_expr(right, row, schema)?) .map(|eq| !eq) .unwrap_or(false)), LogicalExpr::And(left, right) => { Ok(eval_predicate(left, row, schema)? && eval_predicate(right, row, schema)?) } LogicalExpr::Or(left, right) => { Ok(eval_predicate(left, row, schema)? || eval_predicate(right, row, schema)?) } _ => Ok(false), } } fn eval_expr( expr: &LogicalExpr, row: &Row, schema: &crate::relational::Schema, ) -> Result { match expr { LogicalExpr::Column(name) => { let index = schema .index_of(name) .ok_or_else(|| ExecutionError::UnknownColumn(name.clone()))?; Ok(row.get(index).cloned().unwrap_or(Value::Null)) } LogicalExpr::Literal(value) => Ok(value.clone()), LogicalExpr::Eq(left, right) => { let left = eval_expr(left, row, schema)?; let right = eval_expr(right, row, schema)?; Ok(Value::Boolean(left.sql_eq(&right).unwrap_or(false))) } LogicalExpr::Ne(left, right) => { let left = eval_expr(left, row, schema)?; let right = eval_expr(right, row, schema)?; Ok(Value::Boolean( left.sql_eq(&right).map(|eq| !eq).unwrap_or(false), )) } LogicalExpr::And(left, right) => Ok(Value::Boolean( eval_predicate(left, row, schema)? && eval_predicate(right, row, schema)?, )), LogicalExpr::Or(left, right) => Ok(Value::Boolean( eval_predicate(left, row, schema)? || eval_predicate(right, row, schema)?, )), } } fn value_from_term(term: &Term) -> Result { match term { Term::Constant(value) => { // Try to interpret the constant as an integer so numeric // aggregates (SUM, AVG) work on chase-backed data. if let Ok(n) = value.parse::() { Ok(Value::Integer(n)) } else { Ok(Value::text(value.clone())) } } Term::Null(_) => Ok(Value::Null), Term::Variable(_) => Err(ExecutionError::NonGroundScanTerm), } } fn resolve_sort_keys( keys: &[SortKey], schema: &crate::relational::Schema, ) -> Result, ExecutionError> { keys.iter() .map(|key| { let index = schema .index_of(&key.column) .ok_or_else(|| ExecutionError::UnknownColumn(key.column.clone()))?; Ok((index, key.direction)) }) .collect() } fn compare_rows(left: &Row, right: &Row, keys: &[(usize, SortDirection)]) -> Ordering { for (index, direction) in keys { let left_value = left.get(*index).unwrap_or(&Value::Null); let right_value = right.get(*index).unwrap_or(&Value::Null); let ordering = compare_values(left_value, right_value); if ordering != Ordering::Equal { return match direction { SortDirection::Asc => ordering, SortDirection::Desc => ordering.reverse(), }; } } Ordering::Equal } fn compare_values(left: &Value, right: &Value) -> Ordering { match (left, right) { (Value::Null, Value::Null) => Ordering::Equal, (Value::Null, _) => Ordering::Greater, (_, Value::Null) => Ordering::Less, (Value::Text(left), Value::Text(right)) => left.cmp(right), (Value::Integer(left), Value::Integer(right)) => left.cmp(right), (Value::Boolean(left), Value::Boolean(right)) => left.cmp(right), // Cross-type ordering: Integer < Text < Boolean (Value::Integer(_), _) => Ordering::Less, (_, Value::Integer(_)) => Ordering::Greater, (Value::Text(_), Value::Boolean(_)) => Ordering::Less, (Value::Boolean(_), Value::Text(_)) => Ordering::Greater, } } #[cfg(test)] mod tests { use super::*; use crate::chase::{Atom, Term}; use crate::relational::{DataType, Field}; #[test] fn instance_datasource_scans_predicate_as_table() { let instance: Instance = vec![ Atom::new( "Parent", vec![Term::constant("alice"), Term::constant("bob")], ), Atom::new( "Parent", vec![Term::constant("bob"), Term::constant("carol")], ), ] .into_iter() .collect(); let schema = Schema::new(vec![ Field::new("c0", DataType::Text, false), Field::new("c1", DataType::Text, false), ]); let result = instance.scan("Parent", &schema).unwrap(); assert_eq!(result.rows().len(), 2); assert_eq!(result.schema().len(), 2); } #[test] fn instance_datasource_returns_empty_for_unknown_predicate() { let instance = Instance::new(); let schema = Schema::new(vec![]); let result = instance.scan("Missing", &schema).unwrap(); assert_eq!(result.rows().len(), 0); } }