285 lines
9.8 KiB
Rust
Raw Normal View History

//! Execution support for the current SQL slice.
//!
//! The executor evaluates a [`LogicalPlan`] against a [`DataSource`] that
//! provides table scans. The built-in [`Instance`](crate::chase::Instance)
//! adapter and the [`TableStore`] are the two provided implementations.
pub mod table_store;
2026-04-09 12:38:43 +02:00
2026-04-10 10:10:46 +02:00
use std::cmp::Ordering;
2026-04-09 12:38:43 +02:00
use std::error::Error;
use std::fmt;
use crate::chase::{Instance, Term};
2026-04-10 10:10:46 +02:00
use crate::planner::logical::{LogicalExpr, LogicalPlan, SortDirection, SortKey};
use crate::relational::{ResultSet, Row, Schema, Value};
pub use table_store::TableStore;
2026-04-09 12:38:43 +02:00
/// Errors returned by the current logical-plan executor.
2026-04-09 12:38:43 +02:00
#[derive(Debug)]
pub enum ExecutionError {
/// A column reference could not be resolved.
2026-04-09 12:38:43 +02:00
UnknownColumn(String),
/// The scan layer encountered a variable term where a ground value was expected.
2026-04-09 12:38:43 +02:00
NonGroundScanTerm,
}
impl fmt::Display for ExecutionError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::UnknownColumn(column) => write!(f, "unknown column `{}`", column),
Self::NonGroundScanTerm => {
write!(f, "cannot scan non-ground terms into relational rows")
}
}
}
}
impl Error for ExecutionError {}
/// A source of relational data for the executor.
///
/// Implementations provide table scans that return rows conforming to a given
/// schema. The executor calls [`scan`](DataSource::scan) for each
/// [`LogicalPlan::Scan`] node; all other operators work on the resulting
/// [`ResultSet`] values.
pub trait DataSource {
/// Scan all rows for the named table, conforming to the provided schema.
fn scan(&self, table: &str, schema: &Schema) -> Result<ResultSet, ExecutionError>;
}
impl DataSource for Instance {
fn scan(&self, table: &str, schema: &Schema) -> Result<ResultSet, ExecutionError> {
let mut rows = Vec::new();
for fact in self.facts_for_predicate(table) {
let values = fact
.terms
.iter()
.map(value_from_term)
.collect::<Result<Vec<_>, _>>()?;
rows.push(Row::new(values));
2026-04-09 12:38:43 +02:00
}
Ok(ResultSet::new(schema.clone(), rows))
}
}
/// Execute a logical plan against the provided data source.
pub fn execute(plan: &LogicalPlan, source: &dyn DataSource) -> Result<ResultSet, ExecutionError> {
match plan {
LogicalPlan::Scan { table, schema } => source.scan(table, schema),
LogicalPlan::CrossJoin {
left,
right,
schema,
} => {
let left_result = execute(left, source)?;
let right_result = execute(right, source)?;
let mut rows = Vec::new();
for left_row in left_result.rows() {
for right_row in right_result.rows() {
let mut values = left_row.values().to_vec();
values.extend_from_slice(right_row.values());
rows.push(Row::new(values));
}
}
Ok(ResultSet::new(schema.clone(), rows))
}
2026-04-09 12:38:43 +02:00
LogicalPlan::Filter { input, predicate } => {
let result = execute(input, source)?;
2026-04-09 12:38:43 +02:00
let filtered_rows = result
.rows()
.iter()
.filter(|row| eval_predicate(predicate, row, result.schema()).unwrap_or(false))
.cloned()
.collect();
Ok(ResultSet::new(result.schema().clone(), filtered_rows))
}
LogicalPlan::Project {
input,
expressions,
schema,
} => {
let result = execute(input, source)?;
2026-04-09 12:38:43 +02:00
let mut rows = Vec::new();
for row in result.rows() {
let values = expressions
.iter()
.map(|expr| eval_expr(&expr.expr, row, result.schema()))
.collect::<Result<Vec<_>, _>>()?;
rows.push(Row::new(values));
}
Ok(ResultSet::new(schema.clone(), rows))
}
2026-04-10 10:10:46 +02:00
LogicalPlan::Sort {
input,
keys,
schema,
} => {
let result = execute(input, source)?;
2026-04-10 10:10:46 +02:00
let mut rows = result.rows().to_vec();
let resolved_keys = resolve_sort_keys(keys, result.schema())?;
rows.sort_by(|left, right| compare_rows(left, right, &resolved_keys));
Ok(ResultSet::new(schema.clone(), rows))
}
LogicalPlan::Limit { input, count } => {
let result = execute(input, source)?;
let rows = result.rows().iter().take(*count).cloned().collect();
Ok(ResultSet::new(result.schema().clone(), rows))
}
2026-04-09 12:38:43 +02:00
}
}
fn eval_predicate(
expr: &LogicalExpr,
row: &Row,
schema: &crate::relational::Schema,
) -> Result<bool, ExecutionError> {
match expr {
LogicalExpr::Eq(left, right) => Ok(eval_expr(left, row, schema)?
.sql_eq(&eval_expr(right, row, schema)?)
.unwrap_or(false)),
LogicalExpr::Ne(left, right) => Ok(eval_expr(left, row, schema)?
.sql_eq(&eval_expr(right, row, schema)?)
.map(|eq| !eq)
.unwrap_or(false)),
LogicalExpr::And(left, right) => {
Ok(eval_predicate(left, row, schema)? && eval_predicate(right, row, schema)?)
}
LogicalExpr::Or(left, right) => {
Ok(eval_predicate(left, row, schema)? || eval_predicate(right, row, schema)?)
}
2026-04-09 12:38:43 +02:00
_ => Ok(false),
}
}
fn eval_expr(
expr: &LogicalExpr,
row: &Row,
schema: &crate::relational::Schema,
) -> Result<Value, ExecutionError> {
match expr {
LogicalExpr::Column(name) => {
let index = schema
.index_of(name)
.ok_or_else(|| ExecutionError::UnknownColumn(name.clone()))?;
Ok(row.get(index).cloned().unwrap_or(Value::Null))
}
LogicalExpr::Literal(value) => Ok(value.clone()),
LogicalExpr::Eq(left, right) => {
let left = eval_expr(left, row, schema)?;
let right = eval_expr(right, row, schema)?;
Ok(Value::Boolean(left.sql_eq(&right).unwrap_or(false)))
}
LogicalExpr::Ne(left, right) => {
let left = eval_expr(left, row, schema)?;
let right = eval_expr(right, row, schema)?;
Ok(Value::Boolean(
left.sql_eq(&right).map(|eq| !eq).unwrap_or(false),
))
}
LogicalExpr::And(left, right) => Ok(Value::Boolean(
eval_predicate(left, row, schema)? && eval_predicate(right, row, schema)?,
)),
LogicalExpr::Or(left, right) => Ok(Value::Boolean(
eval_predicate(left, row, schema)? || eval_predicate(right, row, schema)?,
)),
2026-04-09 12:38:43 +02:00
}
}
fn value_from_term(term: &Term) -> Result<Value, ExecutionError> {
match term {
Term::Constant(value) => Ok(Value::text(value.clone())),
Term::Null(_) => Ok(Value::Null),
Term::Variable(_) => Err(ExecutionError::NonGroundScanTerm),
}
}
2026-04-10 10:10:46 +02:00
fn resolve_sort_keys(
keys: &[SortKey],
schema: &crate::relational::Schema,
) -> Result<Vec<(usize, SortDirection)>, ExecutionError> {
keys.iter()
.map(|key| {
let index = schema
.index_of(&key.column)
.ok_or_else(|| ExecutionError::UnknownColumn(key.column.clone()))?;
Ok((index, key.direction))
})
.collect()
}
fn compare_rows(left: &Row, right: &Row, keys: &[(usize, SortDirection)]) -> Ordering {
for (index, direction) in keys {
let left_value = left.get(*index).unwrap_or(&Value::Null);
let right_value = right.get(*index).unwrap_or(&Value::Null);
let ordering = compare_values(left_value, right_value);
if ordering != Ordering::Equal {
return match direction {
SortDirection::Asc => ordering,
SortDirection::Desc => ordering.reverse(),
};
}
}
Ordering::Equal
}
fn compare_values(left: &Value, right: &Value) -> Ordering {
match (left, right) {
(Value::Null, Value::Null) => Ordering::Equal,
(Value::Null, _) => Ordering::Greater,
(_, Value::Null) => Ordering::Less,
(Value::Text(left), Value::Text(right)) => left.cmp(right),
(Value::Integer(left), Value::Integer(right)) => left.cmp(right),
2026-04-10 10:10:46 +02:00
(Value::Boolean(left), Value::Boolean(right)) => left.cmp(right),
// Cross-type ordering: Integer < Text < Boolean
(Value::Integer(_), _) => Ordering::Less,
(_, Value::Integer(_)) => Ordering::Greater,
2026-04-10 10:10:46 +02:00
(Value::Text(_), Value::Boolean(_)) => Ordering::Less,
(Value::Boolean(_), Value::Text(_)) => Ordering::Greater,
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::chase::{Atom, Term};
use crate::relational::{DataType, Field};
#[test]
fn instance_datasource_scans_predicate_as_table() {
let instance: Instance = vec![
Atom::new(
"Parent",
vec![Term::constant("alice"), Term::constant("bob")],
),
Atom::new(
"Parent",
vec![Term::constant("bob"), Term::constant("carol")],
),
]
.into_iter()
.collect();
let schema = Schema::new(vec![
Field::new("c0", DataType::Text, false),
Field::new("c1", DataType::Text, false),
]);
let result = instance.scan("Parent", &schema).unwrap();
assert_eq!(result.rows().len(), 2);
assert_eq!(result.schema().len(), 2);
}
#[test]
fn instance_datasource_returns_empty_for_unknown_predicate() {
let instance = Instance::new();
let schema = Schema::new(vec![]);
let result = instance.scan("Missing", &schema).unwrap();
assert_eq!(result.rows().len(), 0);
}
}