use std::collections::HashSet; use std::error::Error; use std::fmt; use crate::catalog::{CatalogError, PredicateCatalog}; use crate::planner::logical::{ LogicalExpr, LogicalPlan, NamedExpr, SortDirection as LogicalSortDirection, SortKey, }; use crate::relational::{DataType, Field, Schema, Value}; use crate::sql::ast::{ BinaryOp, Expr, Literal, OrderByItem, Select, SelectItem, SortDirection, TableRef, }; /// Errors returned when translating SQL AST into a logical plan. #[derive(Debug)] pub enum PlannerError { /// Catalog lookup failed. Catalog(CatalogError), /// A referenced column does not exist in the input schema. UnknownColumn(String), /// A table or alias name appears more than once in one query. DuplicateSourceName(String), /// The current `ORDER BY` subset only supports output column names. UnsupportedOrderBy, } impl fmt::Display for PlannerError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { Self::Catalog(err) => write!(f, "catalog error: {}", err), Self::UnknownColumn(column) => write!(f, "unknown column `{}`", column), Self::DuplicateSourceName(name) => { write!(f, "source name `{}` appears more than once", name) } Self::UnsupportedOrderBy => { write!(f, "only output column names are supported in ORDER BY") } } } } impl Error for PlannerError { fn source(&self) -> Option<&(dyn Error + 'static)> { match self { Self::Catalog(err) => Some(err), Self::UnknownColumn(_) | Self::DuplicateSourceName(_) | Self::UnsupportedOrderBy => { None } } } } impl From for PlannerError { fn from(value: CatalogError) -> Self { Self::Catalog(value) } } /// Plan a parsed `SELECT` statement into the current logical plan subset. pub fn plan_select( select: &Select, catalog: &PredicateCatalog, ) -> Result { let (mut plan, input_schema) = plan_from_tables(&select.from, catalog)?; if let Some(selection) = &select.selection { let predicate = plan_expr(selection, &input_schema)?; plan = LogicalPlan::Filter { input: Box::new(plan), predicate, }; } if is_wildcard_projection(&select.projection) { let output_schema = plan.output_schema().clone(); return maybe_apply_sort(plan, output_schema, &select.order_by); } let mut expressions = Vec::new(); let mut fields = Vec::new(); for (index, item) in select.projection.iter().enumerate() { match item { SelectItem::Expr { expr, alias } => { let planned_expr = plan_expr(expr, &input_schema)?; let output_name = alias .clone() .unwrap_or_else(|| default_projection_name(expr, index + 1)); let (data_type, nullable) = projection_metadata(expr, &input_schema)?; expressions.push(NamedExpr { name: output_name.clone(), expr: planned_expr, }); fields.push(Field::new(output_name, data_type, nullable)); } SelectItem::Wildcard => unreachable!("wildcard projections are handled earlier"), } } let plan = LogicalPlan::Project { input: Box::new(plan), expressions, schema: Schema::new(fields), }; let output_schema = plan.output_schema().clone(); maybe_apply_sort(plan, output_schema, &select.order_by) } fn is_wildcard_projection(items: &[SelectItem]) -> bool { matches!(items, [SelectItem::Wildcard]) } fn plan_from_tables( tables: &[TableRef], catalog: &PredicateCatalog, ) -> Result<(LogicalPlan, Schema), PlannerError> { let mut seen = HashSet::new(); let mut table_iter = tables.iter(); let first = table_iter.next().ok_or_else(|| { PlannerError::Catalog(CatalogError::UnknownTable("".to_string())) })?; let first_name = source_name(first); if !seen.insert(first_name.clone()) { return Err(PlannerError::DuplicateSourceName(first_name)); } let first_schema = input_schema_for_table(first, catalog, should_qualify_columns(first, tables))?; let mut plan = LogicalPlan::Scan { table: first.name.clone(), schema: first_schema.clone(), }; let mut combined_schema = first_schema; for table in table_iter { let qualified_name = source_name(table); if !seen.insert(qualified_name.clone()) { return Err(PlannerError::DuplicateSourceName(qualified_name)); } let right_schema = input_schema_for_table(table, catalog, should_qualify_columns(table, tables))?; let join_schema = combine_schemas(&combined_schema, &right_schema); let right_plan = LogicalPlan::Scan { table: table.name.clone(), schema: right_schema.clone(), }; plan = LogicalPlan::CrossJoin { left: Box::new(plan), right: Box::new(right_plan), schema: join_schema.clone(), }; combined_schema = join_schema; } Ok((plan, combined_schema)) } fn plan_expr(expr: &Expr, schema: &Schema) -> Result { match expr { Expr::Identifier(name) => { if schema.index_of(name).is_none() { return Err(PlannerError::UnknownColumn(name.clone())); } Ok(LogicalExpr::Column(name.clone())) } Expr::Literal(literal) => Ok(LogicalExpr::Literal(plan_literal(literal))), Expr::Binary { left, op, right } => match op { BinaryOp::Eq => Ok(LogicalExpr::Eq( Box::new(plan_expr(left, schema)?), Box::new(plan_expr(right, schema)?), )), BinaryOp::And => Ok(LogicalExpr::And( Box::new(plan_expr(left, schema)?), Box::new(plan_expr(right, schema)?), )), }, } } fn maybe_apply_sort( plan: LogicalPlan, schema: Schema, order_by: &[OrderByItem], ) -> Result { if order_by.is_empty() { return Ok(plan); } let mut keys = Vec::new(); for item in order_by { let column = match &item.expr { Expr::Identifier(name) => name.clone(), _ => return Err(PlannerError::UnsupportedOrderBy), }; if schema.index_of(&column).is_none() { return Err(PlannerError::UnknownColumn(column)); } keys.push(SortKey { column, direction: match item.direction { SortDirection::Asc => LogicalSortDirection::Asc, SortDirection::Desc => LogicalSortDirection::Desc, }, }); } Ok(LogicalPlan::Sort { input: Box::new(plan), keys, schema, }) } fn plan_literal(literal: &Literal) -> Value { match literal { Literal::String(value) => Value::text(value.clone()), Literal::Null => Value::Null, } } fn projection_metadata(expr: &Expr, schema: &Schema) -> Result<(DataType, bool), PlannerError> { match expr { Expr::Identifier(name) => { let index = schema .index_of(name) .ok_or_else(|| PlannerError::UnknownColumn(name.clone()))?; let field = &schema.fields()[index]; Ok((field.data_type().clone(), field.nullable())) } Expr::Literal(Literal::String(_)) => Ok((DataType::Text, false)), Expr::Literal(Literal::Null) => Ok((DataType::Text, true)), Expr::Binary { .. } => Ok((DataType::Boolean, true)), } } fn default_projection_name(expr: &Expr, ordinal: usize) -> String { match expr { Expr::Identifier(name) => name.clone(), Expr::Literal(_) | Expr::Binary { .. } => format!("expr{}", ordinal), } } fn input_schema_for_table( table: &TableRef, catalog: &PredicateCatalog, qualify_columns: bool, ) -> Result { let schema = catalog.schema_for(&table.name)?.clone(); if !qualify_columns { return Ok(schema); } let qualifier = source_name(table); let fields = schema .fields() .iter() .map(|field| { Field::new( format!("{}.{}", qualifier, field.name()), field.data_type().clone(), field.nullable(), ) }) .collect(); Ok(Schema::new(fields)) } fn should_qualify_columns(table: &TableRef, tables: &[TableRef]) -> bool { table.alias.is_some() || tables.len() > 1 } fn source_name(table: &TableRef) -> String { table.alias.clone().unwrap_or_else(|| table.name.clone()) } fn combine_schemas(left: &Schema, right: &Schema) -> Schema { let mut fields = left.fields().to_vec(); fields.extend_from_slice(right.fields()); Schema::new(fields) } #[cfg(test)] mod tests { use super::*; use crate::catalog::PredicateCatalog; use crate::chase::{Atom, Instance, Term}; use crate::sql::parser::parse_select; #[test] fn plans_projection_and_filter() { let instance: Instance = vec![Atom::new( "Parent", vec![Term::constant("alice"), Term::constant("bob")], )] .into_iter() .collect(); let catalog = PredicateCatalog::from_instance(&instance).unwrap(); let select = parse_select("SELECT c0 FROM Parent WHERE c1 = 'bob'").unwrap(); let plan = plan_select(&select, &catalog).unwrap(); assert_eq!(plan.output_schema().len(), 1); } #[test] fn plans_aliases_and_literal_projection() { let instance: Instance = vec![Atom::new( "Parent", vec![Term::constant("alice"), Term::constant("bob")], )] .into_iter() .collect(); let catalog = PredicateCatalog::from_instance(&instance).unwrap(); let select = parse_select("SELECT c0 AS parent_name, 'seed' AS label, NULL FROM Parent").unwrap(); let plan = plan_select(&select, &catalog).unwrap(); let schema = plan.output_schema(); assert_eq!(schema.len(), 3); assert_eq!(schema.fields()[0].name(), "parent_name"); assert_eq!(schema.fields()[1].name(), "label"); assert_eq!(schema.fields()[2].name(), "expr3"); assert_eq!(schema.fields()[1].data_type(), &DataType::Text); } #[test] fn plans_multi_table_select_with_qualified_columns() { let instance: Instance = vec![ Atom::new( "Parent", vec![Term::constant("alice"), Term::constant("bob")], ), Atom::new( "Ancestor", vec![Term::constant("bob"), Term::constant("carol")], ), ] .into_iter() .collect(); let mut catalog = PredicateCatalog::from_instance(&instance).unwrap(); catalog .rename_columns("Parent", ["parent", "child"]) .unwrap(); catalog .rename_columns("Ancestor", ["parent", "child"]) .unwrap(); let select = parse_select( "SELECT Parent.parent, Ancestor.child FROM Parent, Ancestor \ WHERE Parent.child = Ancestor.parent", ) .unwrap(); let plan = plan_select(&select, &catalog).unwrap(); let schema = plan.output_schema(); assert_eq!(schema.len(), 2); assert_eq!(schema.fields()[0].name(), "Parent.parent"); assert_eq!(schema.fields()[1].name(), "Ancestor.child"); } #[test] fn plans_self_join_with_table_aliases() { let instance: Instance = vec![ Atom::new( "Parent", vec![Term::constant("alice"), Term::constant("bob")], ), Atom::new( "Parent", vec![Term::constant("bob"), Term::constant("carol")], ), ] .into_iter() .collect(); let mut catalog = PredicateCatalog::from_instance(&instance).unwrap(); catalog .rename_columns("Parent", ["parent", "child"]) .unwrap(); let select = parse_select( "SELECT p.parent, q.child FROM Parent AS p, Parent AS q \ WHERE p.child = q.parent", ) .unwrap(); let plan = plan_select(&select, &catalog).unwrap(); let schema = plan.output_schema(); assert_eq!(schema.len(), 2); assert_eq!(schema.fields()[0].name(), "p.parent"); assert_eq!(schema.fields()[1].name(), "q.child"); } #[test] fn plans_single_table_with_alias() { let instance: Instance = vec![Atom::new( "Parent", vec![Term::constant("alice"), Term::constant("bob")], )] .into_iter() .collect(); let mut catalog = PredicateCatalog::from_instance(&instance).unwrap(); catalog .rename_columns("Parent", ["parent", "child"]) .unwrap(); let select = parse_select("SELECT p.parent FROM Parent AS p WHERE p.child = 'bob'").unwrap(); let plan = plan_select(&select, &catalog).unwrap(); let schema = plan.output_schema(); assert_eq!(schema.len(), 1); assert_eq!(schema.fields()[0].name(), "p.parent"); } #[test] fn plans_conjunctive_filter() { let instance: Instance = vec![Atom::new( "Parent", vec![Term::constant("alice"), Term::constant("bob")], )] .into_iter() .collect(); let catalog = PredicateCatalog::from_instance(&instance).unwrap(); let select = parse_select("SELECT c0 FROM Parent WHERE c1 = 'bob' AND c0 = 'alice'").unwrap(); let plan = plan_select(&select, &catalog).unwrap(); match plan { LogicalPlan::Project { input, .. } => match *input { LogicalPlan::Filter { predicate, .. } => { assert!(matches!(predicate, LogicalExpr::And(_, _))); } other => panic!("unexpected input plan: {:?}", other), }, other => panic!("unexpected plan: {:?}", other), } } #[test] fn plans_order_by_after_projection() { let instance: Instance = vec![ Atom::new( "Parent", vec![Term::constant("alice"), Term::constant("bob")], ), Atom::new( "Parent", vec![Term::constant("bob"), Term::constant("carol")], ), ] .into_iter() .collect(); let catalog = PredicateCatalog::from_instance(&instance).unwrap(); let select = parse_select("SELECT c0 FROM Parent ORDER BY c0 DESC").unwrap(); let plan = plan_select(&select, &catalog).unwrap(); match plan { LogicalPlan::Sort { keys, input, .. } => { assert_eq!(keys.len(), 1); assert_eq!(keys[0].column, "c0"); assert!(matches!(keys[0].direction, LogicalSortDirection::Desc)); assert!(matches!(*input, LogicalPlan::Project { .. })); } other => panic!("unexpected plan: {:?}", other), } } }