380 lines
9.9 KiB
Markdown
380 lines
9.9 KiB
Markdown
# Physical Plans and Operators
|
|
|
|
A reference for how a logical query plan becomes executable runtime work.
|
|
|
|
---
|
|
|
|
## Short answer
|
|
|
|
The physical plan is the runtime form of the query.
|
|
|
|
Logical plans say:
|
|
|
|
- what operations are required
|
|
|
|
Physical plans say:
|
|
|
|
- which executable operators will run
|
|
- what inputs they consume
|
|
- what batches they produce
|
|
- how logical expressions are turned into evaluable runtime expressions
|
|
|
|
In the companion engine, the physical layer is intentionally direct:
|
|
|
|
- each logical node maps to a concrete executable operator
|
|
- execution produces `Sequence<RecordBatch>`
|
|
- operators are mostly batch-oriented and columnar
|
|
|
|
That makes the logical-to-physical boundary easy to study.
|
|
|
|
---
|
|
|
|
## The physical planning pipeline
|
|
|
|
Relevant implementation areas:
|
|
|
|
- query planning from logical to physical operators
|
|
- the physical-plan interface
|
|
- scan execution
|
|
- selection execution
|
|
- projection execution
|
|
- hash aggregation
|
|
- hash join
|
|
- limit execution
|
|
|
|
Conceptually:
|
|
|
|
```text
|
|
logical plan
|
|
-> choose concrete executable operators
|
|
-> translate logical expressions into physical expressions
|
|
-> execute operators over record batches
|
|
```
|
|
|
|
---
|
|
|
|
## What a physical plan is
|
|
|
|
The `PhysicalPlan` interface defines three core things:
|
|
|
|
- `schema()`: the shape of produced output
|
|
- `execute()`: run the operator and emit `RecordBatch` values
|
|
- `children()`: the operator inputs
|
|
|
|
This is a clean and important boundary.
|
|
|
|
The physical layer is no longer about parsing or language semantics. It is about executable dataflow over batches.
|
|
|
|
---
|
|
|
|
## The planner's job
|
|
|
|
`QueryPlanner.createPhysicalPlan(plan)` recursively maps each logical operator to a runtime operator.
|
|
|
|
The mappings are intentionally readable:
|
|
|
|
- `Scan` -> `ScanExec`
|
|
- `Selection` -> `SelectionExec`
|
|
- `Projection` -> `ProjectionExec`
|
|
- `Aggregate` -> `HashAggregateExec`
|
|
- `Join` -> `HashJoinExec`
|
|
- `Limit` -> `LimitExec`
|
|
|
|
At the same time, it translates logical expressions into runtime expressions.
|
|
|
|
This means physical planning has two distinct jobs:
|
|
|
|
1. choose operator implementations
|
|
2. lower logical expressions into executable expression evaluators
|
|
|
|
That split is worth keeping explicit in any engine design.
|
|
|
|
---
|
|
|
|
## Scan execution
|
|
|
|
`ScanExec` is the leaf physical operator.
|
|
|
|
Its job is simple:
|
|
|
|
- expose the projected schema
|
|
- delegate scanning to the underlying `DataSource`
|
|
- rely on the source to produce batches
|
|
|
|
This is also where projection pushdown becomes concrete. The operator receives a list of projected columns and passes that projection down to the source.
|
|
|
|
So even though `ScanExec` is small, it carries important architectural meaning:
|
|
|
|
- it is the storage boundary
|
|
- it is where source capabilities first affect runtime behavior
|
|
- it is how the physical plan begins consuming external data
|
|
|
|
---
|
|
|
|
## Projection execution
|
|
|
|
`ProjectionExec` runs a list of physical expressions over each input batch.
|
|
|
|
For each incoming `RecordBatch`, it:
|
|
|
|
1. evaluates each projection expression
|
|
2. collects the resulting output columns
|
|
3. builds a new `RecordBatch` with the projection schema
|
|
|
|
This captures the batch-oriented style clearly:
|
|
|
|
- expressions do not produce one value
|
|
- they produce a whole output column for the batch
|
|
|
|
That is a core mental shift in vectorized engines.
|
|
|
|
---
|
|
|
|
## Selection execution
|
|
|
|
`SelectionExec` evaluates a predicate expression against a batch and expects the result to be a boolean Arrow `BitVector`.
|
|
|
|
It then:
|
|
|
|
1. counts selected rows
|
|
2. allocates filtered output vectors
|
|
3. copies only rows whose predicate bit is true
|
|
4. returns a filtered batch with the same schema
|
|
|
|
This is a useful example of how vectorized filtering really works:
|
|
|
|
- the predicate is first materialized as a boolean selection vector
|
|
- then each input column is filtered using that vector
|
|
|
|
So selection is not "loop through rows and emit rows" at the conceptual boundary. It is "evaluate a batch predicate, then compact batch columns."
|
|
|
|
---
|
|
|
|
## Hash aggregation
|
|
|
|
`HashAggregateExec` is the aggregate runtime operator.
|
|
|
|
It keeps a map from:
|
|
|
|
- group key
|
|
|
|
to:
|
|
|
|
- a list of aggregate accumulators
|
|
|
|
Its runtime shape is:
|
|
|
|
1. evaluate grouping expressions on the batch
|
|
2. evaluate aggregate input expressions on the batch
|
|
3. iterate through rows
|
|
4. look up or create accumulator state for the row's grouping key
|
|
5. update accumulators
|
|
6. after all input is consumed, build one output batch from the accumulated state
|
|
|
|
This is a blocking operator:
|
|
|
|
- it must inspect all input before final results are ready
|
|
|
|
That matters because the physical plan may be largely pipelined, but some operators introduce a barrier.
|
|
|
|
---
|
|
|
|
## Partial and final aggregation
|
|
|
|
The aggregate operator also supports different modes:
|
|
|
|
- `COMPLETE`
|
|
- `PARTIAL`
|
|
- `FINAL`
|
|
|
|
This is important because it shows how distributed execution reuses the same conceptual operator in multiple stages.
|
|
|
|
The idea is:
|
|
|
|
- `PARTIAL`: produce intermediate aggregate state locally
|
|
- `FINAL`: merge intermediate states into final answers
|
|
|
|
Average is the clearest example.
|
|
|
|
You cannot merge two averages by averaging the averages. You need both:
|
|
|
|
- partial sum
|
|
- partial count
|
|
|
|
The companion code models that explicitly through `AvgIntermediateState`.
|
|
|
|
That is a good example of physical planning and execution needing richer runtime state than the SQL surface suggests.
|
|
|
|
---
|
|
|
|
## Hash join
|
|
|
|
`HashJoinExec` implements joins with a classic build/probe strategy.
|
|
|
|
It:
|
|
|
|
1. fully reads the right input
|
|
2. builds a hash table keyed by the right join columns
|
|
3. streams through the left input
|
|
4. probes the hash table for matches
|
|
5. constructs joined rows
|
|
|
|
For left joins, it emits unmatched left rows with null-filled right columns.
|
|
|
|
For right joins, it performs extra work after probing to emit unmatched right rows.
|
|
|
|
Two practical details are worth noting.
|
|
|
|
First, the planner resolves join column names to physical column indices ahead of execution. Runtime code wants positions, not symbolic names.
|
|
|
|
Second, duplicate key columns from the right side may be excluded from the output schema when the join keys have the same name. That is a schema-shaping concern the planner and operator must agree on.
|
|
|
|
---
|
|
|
|
## Limit execution
|
|
|
|
`LimitExec` is simple but instructive.
|
|
|
|
It streams input batches until the requested number of rows has been produced. If the limit falls in the middle of a batch, it truncates that batch and stops.
|
|
|
|
This shows that even "tiny" operators still need batch-aware logic:
|
|
|
|
- whole-batch pass-through when possible
|
|
- partial-batch copying when necessary
|
|
|
|
So batch-oriented execution changes the shape of even very simple operators.
|
|
|
|
---
|
|
|
|
## Expression lowering at the physical boundary
|
|
|
|
The query planner also lowers logical expressions into physical expressions.
|
|
|
|
Examples:
|
|
|
|
- `Column(name)` -> `ColumnExpression(index)`
|
|
- `LiteralLong` -> `LiteralLongExpression`
|
|
- `Eq` -> `EqExpression`
|
|
- `Add` -> `AddExpression`
|
|
- `CastExpr` -> `CastExpression`
|
|
|
|
One important detail is that aliases disappear at this stage.
|
|
|
|
That is correct because aliases affect:
|
|
|
|
- names
|
|
- schema
|
|
- planning
|
|
|
|
They do not affect the actual runtime computation.
|
|
|
|
This is a good example of semantics getting split cleanly across layers.
|
|
|
|
---
|
|
|
|
## Operator tree and dataflow
|
|
|
|
The physical plan is still a tree of operators, but now it is executable.
|
|
|
|
A simple query such as:
|
|
|
|
```sql
|
|
SELECT name
|
|
FROM employees
|
|
WHERE age > 18
|
|
LIMIT 10
|
|
```
|
|
|
|
typically becomes something like:
|
|
|
|
1. `ScanExec`
|
|
2. `SelectionExec`
|
|
3. `ProjectionExec`
|
|
4. `LimitExec`
|
|
|
|
Each operator consumes batches from its child and emits new batches upward.
|
|
|
|
That is the operational heart of the engine.
|
|
|
|
---
|
|
|
|
## Blocking vs streaming behavior
|
|
|
|
The physical layer makes it obvious which operators can stream and which cannot.
|
|
|
|
Mostly streaming:
|
|
|
|
- scan
|
|
- projection
|
|
- selection
|
|
- limit
|
|
|
|
Blocking or partially blocking:
|
|
|
|
- hash aggregate
|
|
- hash join build phase
|
|
- right-join unmatched-row handling
|
|
|
|
This distinction matters because runtime latency and memory behavior depend as much on operator blocking properties as on logical operator choice.
|
|
|
|
---
|
|
|
|
## Strengths of the companion design
|
|
|
|
The companion physical layer is especially useful as a teaching architecture because:
|
|
|
|
- the logical-to-physical mapping is easy to see
|
|
- operators are concrete and local
|
|
- batches are the universal runtime unit
|
|
- runtime expressions are clearly separate from logical expressions
|
|
- distributed extensions can reuse the same concepts
|
|
|
|
It is not trying to be the most optimized design possible. It is trying to make the moving parts visible.
|
|
|
|
That is why it works well as a study target.
|
|
|
|
---
|
|
|
|
## What is simplified relative to a production engine
|
|
|
|
A production engine would likely add:
|
|
|
|
- richer operator selection
|
|
- more cost-sensitive physical planning
|
|
- more join strategies
|
|
- more sophisticated memory management
|
|
- spill-to-disk behavior
|
|
- stronger predicate pushdown and source capabilities
|
|
- more operator fusion and code generation
|
|
|
|
So this note should be read as a physical-plan primer, not as the full space of execution-engine design.
|
|
|
|
---
|
|
|
|
## Main takeaways
|
|
|
|
- Physical planning chooses executable operators and lowers expressions into runtime evaluators.
|
|
- Batch-oriented execution changes the shape of every operator, even simple ones like `LIMIT` and `FILTER`.
|
|
- Aggregation and joins expose the importance of blocking behavior, runtime state, and schema shaping.
|
|
- Planner work continues beyond logical planning: it also resolves indices, output schemas, and runtime operator contracts.
|
|
- The physical layer is where abstract query meaning finally turns into actual dataflow.
|
|
|
|
---
|
|
|
|
## Related notes
|
|
|
|
- `hqew/002-query-engine-primer.md`
|
|
- `hqew/003-query-engine-architectures.md`
|
|
- `hqew/006-query-execution-models.md`
|
|
- `hqew/008-joins-and-aggregations.md`
|
|
- `hqew/009-distributed-query-engines.md`
|
|
- `hqew/015-sql-front-end-and-logical-planning.md`
|
|
- `hqew/017-expressions-types-and-nullability.md`
|
|
|
|
---
|
|
|
|
## Changelog
|
|
|
|
* **Apr 7, 2026** -- Removed references to ignored local paths.
|
|
* **Apr 7, 2026** -- Added a dedicated note on physical planning, executable operators, and batch-oriented runtime behavior.
|