//! Execution support for the current SQL slice. //! //! The executor evaluates a [`LogicalPlan`] against a [`DataSource`] that //! provides table scans. The built-in [`Instance`](crate::chase::Instance) //! adapter and the [`TableStore`] are the two provided implementations. pub mod physical; pub mod table_store; use std::cmp::Ordering; use std::error::Error; use std::fmt; use crate::chase::{Instance, Term}; use crate::planner::logical::{LogicalExpr, LogicalPlan, SortDirection, SortKey}; use crate::relational::{ResultSet, Row, Schema, Value}; pub use physical::{ NamedPhysicalExpr, PhysicalPlan, execute_physical, plan_physical, rewrite_physical, }; pub use table_store::TableStore; /// Errors returned by the current logical-plan executor. #[derive(Debug)] pub enum ExecutionError { /// A column reference could not be resolved. UnknownColumn(String), /// The scan layer encountered a variable term where a ground value was expected. NonGroundScanTerm, } impl fmt::Display for ExecutionError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { Self::UnknownColumn(column) => write!(f, "unknown column `{}`", column), Self::NonGroundScanTerm => { write!(f, "cannot scan non-ground terms into relational rows") } } } } impl Error for ExecutionError {} /// A source of relational data for the executor. /// /// Implementations provide table scans that return rows conforming to a given /// schema. The executor calls [`scan`](DataSource::scan) for each /// [`LogicalPlan::Scan`] node; all other operators work on the resulting /// [`ResultSet`] values. pub trait DataSource { /// Scan all rows for the named table, conforming to the provided schema. fn scan(&self, table: &str, schema: &Schema) -> Result; } impl DataSource for Instance { fn scan(&self, table: &str, schema: &Schema) -> Result { let mut rows = Vec::new(); for fact in self.facts_for_predicate(table) { let values = fact .terms .iter() .map(value_from_term) .collect::, _>>()?; rows.push(Row::new(values)); } Ok(ResultSet::new(schema.clone(), rows)) } } /// Execute a logical plan against the provided data source. pub fn execute(plan: &LogicalPlan, source: &dyn DataSource) -> Result { match plan { LogicalPlan::Scan { table, schema } => source.scan(table, schema), LogicalPlan::CrossJoin { left, right, schema, } => { let left_result = execute(left, source)?; let right_result = execute(right, source)?; let mut rows = Vec::new(); for left_row in left_result.rows() { for right_row in right_result.rows() { let mut values = left_row.values().to_vec(); values.extend_from_slice(right_row.values()); rows.push(Row::new(values)); } } Ok(ResultSet::new(schema.clone(), rows)) } LogicalPlan::Filter { input, predicate } => { let result = execute(input, source)?; let filtered_rows = result .rows() .iter() .filter(|row| eval_predicate(predicate, row, result.schema()).unwrap_or(false)) .cloned() .collect(); Ok(ResultSet::new(result.schema().clone(), filtered_rows)) } LogicalPlan::Project { input, expressions, schema, } => { let result = execute(input, source)?; let mut rows = Vec::new(); for row in result.rows() { let values = expressions .iter() .map(|expr| eval_expr(&expr.expr, row, result.schema())) .collect::, _>>()?; rows.push(Row::new(values)); } Ok(ResultSet::new(schema.clone(), rows)) } LogicalPlan::Sort { input, keys, schema, } => { let result = execute(input, source)?; let mut rows = result.rows().to_vec(); let resolved_keys = resolve_sort_keys(keys, result.schema())?; rows.sort_by(|left, right| compare_rows(left, right, &resolved_keys)); Ok(ResultSet::new(schema.clone(), rows)) } LogicalPlan::Limit { input, count } => { let result = execute(input, source)?; let rows = result.rows().iter().take(*count).cloned().collect(); Ok(ResultSet::new(result.schema().clone(), rows)) } } } fn eval_predicate( expr: &LogicalExpr, row: &Row, schema: &crate::relational::Schema, ) -> Result { match expr { LogicalExpr::Eq(left, right) => Ok(eval_expr(left, row, schema)? .sql_eq(&eval_expr(right, row, schema)?) .unwrap_or(false)), LogicalExpr::Ne(left, right) => Ok(eval_expr(left, row, schema)? .sql_eq(&eval_expr(right, row, schema)?) .map(|eq| !eq) .unwrap_or(false)), LogicalExpr::And(left, right) => { Ok(eval_predicate(left, row, schema)? && eval_predicate(right, row, schema)?) } LogicalExpr::Or(left, right) => { Ok(eval_predicate(left, row, schema)? || eval_predicate(right, row, schema)?) } _ => Ok(false), } } fn eval_expr( expr: &LogicalExpr, row: &Row, schema: &crate::relational::Schema, ) -> Result { match expr { LogicalExpr::Column(name) => { let index = schema .index_of(name) .ok_or_else(|| ExecutionError::UnknownColumn(name.clone()))?; Ok(row.get(index).cloned().unwrap_or(Value::Null)) } LogicalExpr::Literal(value) => Ok(value.clone()), LogicalExpr::Eq(left, right) => { let left = eval_expr(left, row, schema)?; let right = eval_expr(right, row, schema)?; Ok(Value::Boolean(left.sql_eq(&right).unwrap_or(false))) } LogicalExpr::Ne(left, right) => { let left = eval_expr(left, row, schema)?; let right = eval_expr(right, row, schema)?; Ok(Value::Boolean( left.sql_eq(&right).map(|eq| !eq).unwrap_or(false), )) } LogicalExpr::And(left, right) => Ok(Value::Boolean( eval_predicate(left, row, schema)? && eval_predicate(right, row, schema)?, )), LogicalExpr::Or(left, right) => Ok(Value::Boolean( eval_predicate(left, row, schema)? || eval_predicate(right, row, schema)?, )), } } fn value_from_term(term: &Term) -> Result { match term { Term::Constant(value) => Ok(Value::text(value.clone())), Term::Null(_) => Ok(Value::Null), Term::Variable(_) => Err(ExecutionError::NonGroundScanTerm), } } fn resolve_sort_keys( keys: &[SortKey], schema: &crate::relational::Schema, ) -> Result, ExecutionError> { keys.iter() .map(|key| { let index = schema .index_of(&key.column) .ok_or_else(|| ExecutionError::UnknownColumn(key.column.clone()))?; Ok((index, key.direction)) }) .collect() } fn compare_rows(left: &Row, right: &Row, keys: &[(usize, SortDirection)]) -> Ordering { for (index, direction) in keys { let left_value = left.get(*index).unwrap_or(&Value::Null); let right_value = right.get(*index).unwrap_or(&Value::Null); let ordering = compare_values(left_value, right_value); if ordering != Ordering::Equal { return match direction { SortDirection::Asc => ordering, SortDirection::Desc => ordering.reverse(), }; } } Ordering::Equal } fn compare_values(left: &Value, right: &Value) -> Ordering { match (left, right) { (Value::Null, Value::Null) => Ordering::Equal, (Value::Null, _) => Ordering::Greater, (_, Value::Null) => Ordering::Less, (Value::Text(left), Value::Text(right)) => left.cmp(right), (Value::Integer(left), Value::Integer(right)) => left.cmp(right), (Value::Boolean(left), Value::Boolean(right)) => left.cmp(right), // Cross-type ordering: Integer < Text < Boolean (Value::Integer(_), _) => Ordering::Less, (_, Value::Integer(_)) => Ordering::Greater, (Value::Text(_), Value::Boolean(_)) => Ordering::Less, (Value::Boolean(_), Value::Text(_)) => Ordering::Greater, } } #[cfg(test)] mod tests { use super::*; use crate::chase::{Atom, Term}; use crate::relational::{DataType, Field}; #[test] fn instance_datasource_scans_predicate_as_table() { let instance: Instance = vec![ Atom::new( "Parent", vec![Term::constant("alice"), Term::constant("bob")], ), Atom::new( "Parent", vec![Term::constant("bob"), Term::constant("carol")], ), ] .into_iter() .collect(); let schema = Schema::new(vec![ Field::new("c0", DataType::Text, false), Field::new("c1", DataType::Text, false), ]); let result = instance.scan("Parent", &schema).unwrap(); assert_eq!(result.rows().len(), 2); assert_eq!(result.schema().len(), 2); } #[test] fn instance_datasource_returns_empty_for_unknown_predicate() { let instance = Instance::new(); let schema = Schema::new(vec![]); let result = instance.scan("Missing", &schema).unwrap(); assert_eq!(result.rows().len(), 0); } }