Push down topk through join#21621
Conversation
| } else { | ||
| &join.right | ||
| }; | ||
| if matches!(preserved_child.as_ref(), LogicalPlan::Sort(_)) { |
There was a problem hiding this comment.
This condition looks a bit broad.
If the child has no fetch limit or a larger fetch limit than the current one then pushing down the current Sort with its fetch limit would be beneficial, no ?
The optimization should be skipped only if the Sort expr is different or its fetch limit is non-zero but smaller than the current one.
gene-bordegaray
left a comment
There was a problem hiding this comment.
The idea behind the optimizer rule itself makes sense. I think there is some room to add more regression testing and ensure we uphold correctness standards for all cases. Let me know what you think :)
|
|
||
| // Create the new Sort(fetch) on the preserved child | ||
| let new_child_sort = Arc::new(LogicalPlan::Sort(SortPlan { | ||
| expr: sort.expr.clone(), |
There was a problem hiding this comment.
I don't think we shoudl just clone sort.expr here. Since the Sort can sit on top of a Projection, in this case the ORDER BY clause of the query is interpreted against the projection output columns, not directly on the join's child.
When we push down the Sort(fetch) rather than cloning the Sort columns we need to push down the columns that were projected.
I believe behavior for this right now would work like this:
Sort: b, fetch=1
Projection: -t1.b AS b
Join
The optimizer rewrites it into:
Sort: b, fetch=1
Projection: -t1.b AS b
Join
Sort: b, fetch=1 -> This is using the post-projected value!
| # Child has larger fetch: push our tighter limit | ||
| # The inner Sort(fetch=5) has a larger limit than our outer Sort(fetch=2), | ||
| # so pushing fetch=2 to the preserved child reduces data further. | ||
| query TT | ||
| EXPLAIN SELECT * FROM ( | ||
| SELECT t1.a, t1.b, t2.x | ||
| FROM (SELECT * FROM t1 ORDER BY b ASC LIMIT 5) t1 | ||
| LEFT JOIN t2 ON t1.a = t2.x | ||
| ) sub | ||
| ORDER BY b ASC LIMIT 2; | ||
| ---- | ||
| logical_plan | ||
| 01)Sort: sub.b ASC NULLS LAST, fetch=2 | ||
| 02)--SubqueryAlias: sub | ||
| 03)----Left Join: t1.a = t2.x | ||
| 04)------SubqueryAlias: t1 | ||
| 05)--------Sort: t1.b ASC NULLS LAST, fetch=5 | ||
| 06)----------TableScan: t1 projection=[a, b] | ||
| 07)------TableScan: t2 projection=[x] |
There was a problem hiding this comment.
Seems like we don't actually push down the fetch=2 tighter limit into the nested Sort here.
There was a problem hiding this comment.
It is being blocked by subqueryAlias between sort and join. I think I need to update the comment.
There was a problem hiding this comment.
Right; couldn't we push the topk down despite the alias? This seems like a fairly common query structure that it would be nice to support.
There was a problem hiding this comment.
refactored code to handle SubqueryAlias
| if join.filter.is_some() { | ||
| return Ok(Transformed::no(plan)); | ||
| } |
There was a problem hiding this comment.
Might not be necessary for this PR, but would be pretty easy to check if the filter only references non-preserved-side columns, in which case I think we can still do the pushdown?
There was a problem hiding this comment.
removed filter. added UT to verify results are correct.
| /// (`Option<TableReference>`) structurally. A `Bare("t1")` and | ||
| /// `Full { catalog, schema, table: "t1" }` are NOT equal even though they | ||
| /// refer to the same column. After resolving through SubqueryAlias the | ||
| /// variant may differ, so we compare by display string instead. |
There was a problem hiding this comment.
How Expr::to_string() helps with the missing TableReference on one of the sides ?
I don't understand how this is better than Column::eq().
There was a problem hiding this comment.
updated comment and removed to_string()
@kumarUjjawal hello 👋 , I can do a review tonight / tomorrow morning. Thank you! |
|
@SubhamSinghal overall I like the shape of this PR, let's wait for @gene-bordegaray to take another look. |
gene-bordegaray
left a comment
There was a problem hiding this comment.
one case I think might be unsafe, otherwise this is looking good 👍
| return Ok(Transformed::no(plan)); | ||
| } | ||
| if same_exprs { | ||
| // Tighten existing Sort in-place by rebuilding the path |
There was a problem hiding this comment.
I still believe this is unsafe when deep_resolved_exprs becomes volatile while dong through the preserved children
We gurad on the original out sort expr after resolving operators above the join but this deeper scan through the projected children turn a resolved sort col back to volatile I believe. So we could compare something like two random() and push down incorrectly
There was a problem hiding this comment.
@gene-bordegaray thanks for highlighting this. resolved
|
your new commit addresses my concern. Thank you @SubhamSinghal 💯 |
I have the impression there are sufficient eyes on this one already. I don't have much time to look at this at the moment, and not sure I have much insight to share either. |
|
run benchmark sql_planner |
|
In general I think we should be careful with adding new optimizer rules as they slow down planning |
|
🤖 Criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing push-down-topk-through-join (2473ecc) to 50d74a7 (merge-base) diff File an issue against this benchmark runner |
|
🤖 Criterion benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagebase (merge-base)
branch
File an issue against this benchmark runner |
|
@kumarUjjawal @gene-bordegaray What needs to be done here? |
|
@SubhamSinghal there are some regression in the performance for some of the queries, can you investigate. See #21621 (comment) |
|
I think the planning overhead is somewhat inevitable since it is an additional pass. Could we run benches or could you show the improvements of this on targeted queries |
Or perhaps add this feature to one of the existing passes (like pushdown sort 🤔 ) |
@gene-bordegaray Added benchmark SQL query, let me know if this is what you were expecting. |
|
could you look into if this can be part of the existing sort pushdown rule like @alamb suggested, maybe repursposing the rule to be more general ratehr than another pass of the plan? |
@alamb @gene-bordegaray I have merged |
|
I am not likely going to have time to review this PR anytime soon unfortunely. I have many other PRs to review and I am not focusing on join optimizations at this time unfortnately |
|
I can help review this next week. @SubhamSinghal as a first step I've split out the benchmarks + SLT tests into #22760. This will allow us to (1) make this PR smaller and (2) review the plan changes and benchmark changes as a diff instead of an addition. |
|
Also FYI @Dandandan given you filed |
…e#22760) ## Which issue does this PR close? - Relates to apache#11900 ## Rationale for this change This splits the test and benchmark scaffolding out of apache#21621 so the `PushDownTopKThroughJoin` optimizer rule itself can be reviewed in isolation, with a small, focused diff. The benchmark and SLT files here do not depend on the rule. They are committed first so that: 1. The benchmark can measure the rule's effect against a baseline that does not register it. 2. The follow-up rule PR's diff shows exactly which plans change, since the EXPLAIN plans here capture the current (pre-rule) behavior. ## What changes are included in this PR? - A `push_down_topk` benchmark (`dfbench push-down-topk`) that runs `ORDER BY <cols> LIMIT N` queries over outer joins against TPC-H `customer`/`orders`/`nation`, plus its query files under `benchmarks/queries/push_down_topk/`. - `push_down_topk_through_join.slt` covering the scenarios the rule handles: preserved-side sort keys, ineligible join types (inner/full/semi/anti), `ON`-clause filters, projection and `SubqueryAlias` resolution, existing child sorts, ties, multi-level joins, `OFFSET`, and volatile expressions. The EXPLAIN plans assert current behavior (TopK not yet pushed through the join). The follow-up PR that adds the rule updates those plans in place; the query-result checks hold regardless of whether the rule is enabled. The new optimizer rule, the `push_down_limit.rs` changes, and the `optimizer_rule_reference.md` update from apache#21621 are intentionally left for the follow-up PR. ## Are these changes tested? Yes — this PR is the tests. `push_down_topk_through_join.slt` passes against `main`, and the benchmark binary compiles and runs. ## Are there any user-facing changes? No. No API changes; only new benchmark and test files plus benchmark CLI wiring. --------- Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Which issue does this PR close?
Rationale for this change
When a query has
ORDER BY <cols> LIMIT Non top of an outer join and all sort columns come from the preserved side,DataFusion currently runs the full join first, then sorts and limits. We can push a copy of the
Sort(fetch=N)to the preserved input, reducing the number of rows entering the join.Before:
After:
What changes are included in this PR?
A new logical optimizer rule
PushDownTopKThroughJointhat:Sortwithfetch = Some(N)(TopK)Projectionto find aJoinSort(fetch=N)on the preserved childAre these changes tested?
Yes through UT
Are there any user-facing changes?
No API changes.