//! Minimal execution support for the first SQL slice. use std::cmp::Ordering; use std::error::Error; use std::fmt; use crate::chase::{Instance, Term}; use crate::planner::logical::{LogicalExpr, LogicalPlan, SortDirection, SortKey}; use crate::relational::{ResultSet, Row, Value}; /// Errors returned by the current logical-plan executor. #[derive(Debug)] pub enum ExecutionError { /// A column reference could not be resolved. UnknownColumn(String), /// The scan layer encountered a variable term where a ground value was expected. NonGroundScanTerm, } impl fmt::Display for ExecutionError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { Self::UnknownColumn(column) => write!(f, "unknown column `{}`", column), Self::NonGroundScanTerm => { write!(f, "cannot scan non-ground terms into relational rows") } } } } impl Error for ExecutionError {} /// Execute the current logical-plan subset against an instance-backed source. pub fn execute(plan: &LogicalPlan, instance: &Instance) -> Result { match plan { LogicalPlan::Scan { table, schema } => { let mut rows = Vec::new(); for fact in instance.facts_for_predicate(table) { let values = fact .terms .iter() .map(value_from_term) .collect::, _>>()?; rows.push(Row::new(values)); } Ok(ResultSet::new(schema.clone(), rows)) } LogicalPlan::CrossJoin { left, right, schema, } => { let left_result = execute(left, instance)?; let right_result = execute(right, instance)?; let mut rows = Vec::new(); for left_row in left_result.rows() { for right_row in right_result.rows() { let mut values = left_row.values().to_vec(); values.extend_from_slice(right_row.values()); rows.push(Row::new(values)); } } Ok(ResultSet::new(schema.clone(), rows)) } LogicalPlan::Filter { input, predicate } => { let result = execute(input, instance)?; let filtered_rows = result .rows() .iter() .filter(|row| eval_predicate(predicate, row, result.schema()).unwrap_or(false)) .cloned() .collect(); Ok(ResultSet::new(result.schema().clone(), filtered_rows)) } LogicalPlan::Project { input, expressions, schema, } => { let result = execute(input, instance)?; let mut rows = Vec::new(); for row in result.rows() { let values = expressions .iter() .map(|expr| eval_expr(&expr.expr, row, result.schema())) .collect::, _>>()?; rows.push(Row::new(values)); } Ok(ResultSet::new(schema.clone(), rows)) } LogicalPlan::Sort { input, keys, schema, } => { let result = execute(input, instance)?; let mut rows = result.rows().to_vec(); let resolved_keys = resolve_sort_keys(keys, result.schema())?; rows.sort_by(|left, right| compare_rows(left, right, &resolved_keys)); Ok(ResultSet::new(schema.clone(), rows)) } } } fn eval_predicate( expr: &LogicalExpr, row: &Row, schema: &crate::relational::Schema, ) -> Result { match expr { LogicalExpr::Eq(left, right) => Ok(eval_expr(left, row, schema)? .sql_eq(&eval_expr(right, row, schema)?) .unwrap_or(false)), LogicalExpr::And(left, right) => { Ok(eval_predicate(left, row, schema)? && eval_predicate(right, row, schema)?) } _ => Ok(false), } } fn eval_expr( expr: &LogicalExpr, row: &Row, schema: &crate::relational::Schema, ) -> Result { match expr { LogicalExpr::Column(name) => { let index = schema .index_of(name) .ok_or_else(|| ExecutionError::UnknownColumn(name.clone()))?; Ok(row.get(index).cloned().unwrap_or(Value::Null)) } LogicalExpr::Literal(value) => Ok(value.clone()), LogicalExpr::Eq(left, right) => { let left = eval_expr(left, row, schema)?; let right = eval_expr(right, row, schema)?; Ok(Value::Boolean(left.sql_eq(&right).unwrap_or(false))) } LogicalExpr::And(left, right) => Ok(Value::Boolean( eval_predicate(left, row, schema)? && eval_predicate(right, row, schema)?, )), } } fn value_from_term(term: &Term) -> Result { match term { Term::Constant(value) => Ok(Value::text(value.clone())), Term::Null(_) => Ok(Value::Null), Term::Variable(_) => Err(ExecutionError::NonGroundScanTerm), } } fn resolve_sort_keys( keys: &[SortKey], schema: &crate::relational::Schema, ) -> Result, ExecutionError> { keys.iter() .map(|key| { let index = schema .index_of(&key.column) .ok_or_else(|| ExecutionError::UnknownColumn(key.column.clone()))?; Ok((index, key.direction)) }) .collect() } fn compare_rows(left: &Row, right: &Row, keys: &[(usize, SortDirection)]) -> Ordering { for (index, direction) in keys { let left_value = left.get(*index).unwrap_or(&Value::Null); let right_value = right.get(*index).unwrap_or(&Value::Null); let ordering = compare_values(left_value, right_value); if ordering != Ordering::Equal { return match direction { SortDirection::Asc => ordering, SortDirection::Desc => ordering.reverse(), }; } } Ordering::Equal } fn compare_values(left: &Value, right: &Value) -> Ordering { match (left, right) { (Value::Null, Value::Null) => Ordering::Equal, (Value::Null, _) => Ordering::Greater, (_, Value::Null) => Ordering::Less, (Value::Text(left), Value::Text(right)) => left.cmp(right), (Value::Boolean(left), Value::Boolean(right)) => left.cmp(right), (Value::Text(_), Value::Boolean(_)) => Ordering::Less, (Value::Boolean(_), Value::Text(_)) => Ordering::Greater, } }