Skip to content

Push down topk through join#21621

Open
SubhamSinghal wants to merge 39 commits into
apache:mainfrom
SubhamSinghal:push-down-topk-through-join
Open

Push down topk through join#21621
SubhamSinghal wants to merge 39 commits into
apache:mainfrom
SubhamSinghal:push-down-topk-through-join

Conversation

@SubhamSinghal

@SubhamSinghal SubhamSinghal commented Apr 14, 2026

Copy link
Copy Markdown
Contributor

Which issue does this PR close?

Rationale for this change

When a query has ORDER BY <cols> LIMIT N on top of an outer join and all sort columns come from the preserved side,
DataFusion currently runs the full join first, then sorts and limits. We can push a copy of the Sort(fetch=N) to the preserved input, reducing the number of rows entering the join.

Before:

Sort: t1.b ASC, fetch=3
   Left Join: t1.a = t2.a
     Scan: t1     ← scans ALL rows
     Scan: t2

After:

  Sort: t1.b ASC, fetch=3
    Left Join: t1.a = t2.a
      Sort: t1.b ASC, fetch=3  ← pushed down
        Scan: t1               ← only top-3 rows enter join
      Scan: t2

What changes are included in this PR?

A new logical optimizer rule PushDownTopKThroughJoin that:

  1. Matches Sort with fetch = Some(N) (TopK)
  2. Looks through an optional Projection to find a Join
  3. Checks join type is LEFT or RIGHT with no non-equijoin filter
  4. Verifies all sort expression columns come from the preserved side
  5. Inserts a copy of the Sort(fetch=N) on the preserved child
  6. Keeps the top-level sort for correctness

Are these changes tested?

Yes through UT

Are there any user-facing changes?

No API changes.

@github-actions github-actions Bot added optimizer Optimizer rules sqllogictest SQL Logic Tests (.slt) labels Apr 14, 2026
} else {
&join.right
};
if matches!(preserved_child.as_ref(), LogicalPlan::Sort(_)) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition looks a bit broad.
If the child has no fetch limit or a larger fetch limit than the current one then pushing down the current Sort with its fetch limit would be beneficial, no ?
The optimization should be skipped only if the Sort expr is different or its fetch limit is non-zero but smaller than the current one.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

Comment thread datafusion/sqllogictest/test_files/push_down_topk_through_join.slt
Comment thread datafusion/sqllogictest/test_files/push_down_topk_through_join.slt
gene-bordegaray

This comment was marked as duplicate.

@gene-bordegaray gene-bordegaray left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea behind the optimizer rule itself makes sense. I think there is some room to add more regression testing and ensure we uphold correctness standards for all cases. Let me know what you think :)


// Create the new Sort(fetch) on the preserved child
let new_child_sort = Arc::new(LogicalPlan::Sort(SortPlan {
expr: sort.expr.clone(),

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we shoudl just clone sort.expr here. Since the Sort can sit on top of a Projection, in this case the ORDER BY clause of the query is interpreted against the projection output columns, not directly on the join's child.

When we push down the Sort(fetch) rather than cloning the Sort columns we need to push down the columns that were projected.

I believe behavior for this right now would work like this:

Sort: b, fetch=1
  Projection: -t1.b AS b
    Join

The optimizer rewrites it into:

Sort: b, fetch=1
  Projection: -t1.b AS b
    Join
      Sort: b, fetch=1 -> This is using the post-projected value!

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Comment thread datafusion/optimizer/src/push_down_topk_through_join.rs Outdated
Comment on lines +166 to +184
# Child has larger fetch: push our tighter limit
# The inner Sort(fetch=5) has a larger limit than our outer Sort(fetch=2),
# so pushing fetch=2 to the preserved child reduces data further.
query TT
EXPLAIN SELECT * FROM (
SELECT t1.a, t1.b, t2.x
FROM (SELECT * FROM t1 ORDER BY b ASC LIMIT 5) t1
LEFT JOIN t2 ON t1.a = t2.x
) sub
ORDER BY b ASC LIMIT 2;
----
logical_plan
01)Sort: sub.b ASC NULLS LAST, fetch=2
02)--SubqueryAlias: sub
03)----Left Join: t1.a = t2.x
04)------SubqueryAlias: t1
05)--------Sort: t1.b ASC NULLS LAST, fetch=5
06)----------TableScan: t1 projection=[a, b]
07)------TableScan: t2 projection=[x]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like we don't actually push down the fetch=2 tighter limit into the nested Sort here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is being blocked by subqueryAlias between sort and join. I think I need to update the comment.

@neilconway neilconway Apr 17, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right; couldn't we push the topk down despite the alias? This seems like a fairly common query structure that it would be nice to support.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refactored code to handle SubqueryAlias

Comment on lines +127 to +129
if join.filter.is_some() {
return Ok(Transformed::no(plan));
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might not be necessary for this PR, but would be pretty easy to check if the filter only references non-preserved-side columns, in which case I think we can still do the pushdown?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed filter. added UT to verify results are correct.

Comment thread datafusion/optimizer/src/push_down_topk_through_join.rs Outdated
Comment thread datafusion/optimizer/src/push_down_topk_through_join.rs Outdated
/// (`Option<TableReference>`) structurally. A `Bare("t1")` and
/// `Full { catalog, schema, table: "t1" }` are NOT equal even though they
/// refer to the same column. After resolving through SubqueryAlias the
/// variant may differ, so we compare by display string instead.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How Expr::to_string() helps with the missing TableReference on one of the sides ?
I don't understand how this is better than Column::eq().

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated comment and removed to_string()

@SubhamSinghal SubhamSinghal requested a review from lyne7-sc May 19, 2026 16:16
@gene-bordegaray

Copy link
Copy Markdown
Contributor

@gene-bordegaray do you have any more feedbacks on this?

@kumarUjjawal hello 👋 , I can do a review tonight / tomorrow morning. Thank you!

@kumarUjjawal

Copy link
Copy Markdown
Contributor

@SubhamSinghal overall I like the shape of this PR, let's wait for @gene-bordegaray to take another look.

@gene-bordegaray gene-bordegaray left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one case I think might be unsafe, otherwise this is looking good 👍

return Ok(Transformed::no(plan));
}
if same_exprs {
// Tighten existing Sort in-place by rebuilding the path

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still believe this is unsafe when deep_resolved_exprs becomes volatile while dong through the preserved children

We gurad on the original out sort expr after resolving operators above the join but this deeper scan through the projected children turn a resolved sort col back to volatile I believe. So we could compare something like two random() and push down incorrectly

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gene-bordegaray thanks for highlighting this. resolved

@gene-bordegaray

Copy link
Copy Markdown
Contributor

your new commit addresses my concern. Thank you @SubhamSinghal 💯

@pepijnve

Copy link
Copy Markdown
Contributor

@pepijnve Do you have the bandwidth to look at this, would like your thoughts on the PR.

I have the impression there are sufficient eyes on this one already. I don't have much time to look at this at the moment, and not sure I have much insight to share either.

@alamb

alamb commented May 22, 2026

Copy link
Copy Markdown
Contributor

run benchmark sql_planner

@alamb

alamb commented May 22, 2026

Copy link
Copy Markdown
Contributor

In general I think we should be careful with adding new optimizer rules as they slow down planning

@adriangbot

Copy link
Copy Markdown

🤖 Criterion benchmark running (GKE) | trigger
Instance: c4a-highmem-16 (12 vCPU / 65 GiB) | Linux bench-c4522142693-298-kf9gr 6.12.68+ #1 SMP Wed Apr 1 02:23:28 UTC 2026 aarch64 GNU/Linux

CPU Details (lscpu)
Architecture:                            aarch64
CPU op-mode(s):                          64-bit
Byte Order:                              Little Endian
CPU(s):                                  16
On-line CPU(s) list:                     0-15
Vendor ID:                               ARM
Model name:                              Neoverse-V2
Model:                                   1
Thread(s) per core:                      1
Core(s) per cluster:                     16
Socket(s):                               -
Cluster(s):                              1
Stepping:                                r0p1
BogoMIPS:                                2000.00
Flags:                                   fp asimd evtstrm aes pmull sha1 sha2 crc32 atomics fphp asimdhp cpuid asimdrdm jscvt fcma lrcpc dcpop sha3 sm3 sm4 asimddp sha512 sve asimdfhm dit uscat ilrcpc flagm sb paca pacg dcpodp sve2 sveaes svepmull svebitperm svesha3 svesm4 flagm2 frint svei8mm svebf16 i8mm bf16 dgh rng bti
L1d cache:                               1 MiB (16 instances)
L1i cache:                               1 MiB (16 instances)
L2 cache:                                32 MiB (16 instances)
L3 cache:                                80 MiB (1 instance)
NUMA node(s):                            1
NUMA node0 CPU(s):                       0-15
Vulnerability Gather data sampling:      Not affected
Vulnerability Indirect target selection: Not affected
Vulnerability Itlb multihit:             Not affected
Vulnerability L1tf:                      Not affected
Vulnerability Mds:                       Not affected
Vulnerability Meltdown:                  Not affected
Vulnerability Mmio stale data:           Not affected
Vulnerability Reg file data sampling:    Not affected
Vulnerability Retbleed:                  Not affected
Vulnerability Spec rstack overflow:      Not affected
Vulnerability Spec store bypass:         Mitigation; Speculative Store Bypass disabled via prctl
Vulnerability Spectre v1:                Mitigation; __user pointer sanitization
Vulnerability Spectre v2:                Mitigation; CSV2, BHB
Vulnerability Srbds:                     Not affected
Vulnerability Tsa:                       Not affected
Vulnerability Tsx async abort:           Not affected
Vulnerability Vmscape:                   Not affected

Comparing push-down-topk-through-join (2473ecc) to 50d74a7 (merge-base) diff
BENCH_NAME=sql_planner
BENCH_COMMAND=cargo bench --features=parquet --bench sql_planner
BENCH_FILTER=
Results will be posted here when complete


File an issue against this benchmark runner

@adriangbot

Copy link
Copy Markdown

🤖 Criterion benchmark completed (GKE) | trigger

Instance: c4a-highmem-16 (12 vCPU / 65 GiB)

CPU Details (lscpu)
Architecture:                            aarch64
CPU op-mode(s):                          64-bit
Byte Order:                              Little Endian
CPU(s):                                  16
On-line CPU(s) list:                     0-15
Vendor ID:                               ARM
Model name:                              Neoverse-V2
Model:                                   1
Thread(s) per core:                      1
Core(s) per cluster:                     16
Socket(s):                               -
Cluster(s):                              1
Stepping:                                r0p1
BogoMIPS:                                2000.00
Flags:                                   fp asimd evtstrm aes pmull sha1 sha2 crc32 atomics fphp asimdhp cpuid asimdrdm jscvt fcma lrcpc dcpop sha3 sm3 sm4 asimddp sha512 sve asimdfhm dit uscat ilrcpc flagm sb paca pacg dcpodp sve2 sveaes svepmull svebitperm svesha3 svesm4 flagm2 frint svei8mm svebf16 i8mm bf16 dgh rng bti
L1d cache:                               1 MiB (16 instances)
L1i cache:                               1 MiB (16 instances)
L2 cache:                                32 MiB (16 instances)
L3 cache:                                80 MiB (1 instance)
NUMA node(s):                            1
NUMA node0 CPU(s):                       0-15
Vulnerability Gather data sampling:      Not affected
Vulnerability Indirect target selection: Not affected
Vulnerability Itlb multihit:             Not affected
Vulnerability L1tf:                      Not affected
Vulnerability Mds:                       Not affected
Vulnerability Meltdown:                  Not affected
Vulnerability Mmio stale data:           Not affected
Vulnerability Reg file data sampling:    Not affected
Vulnerability Retbleed:                  Not affected
Vulnerability Spec rstack overflow:      Not affected
Vulnerability Spec store bypass:         Mitigation; Speculative Store Bypass disabled via prctl
Vulnerability Spectre v1:                Mitigation; __user pointer sanitization
Vulnerability Spectre v2:                Mitigation; CSV2, BHB
Vulnerability Srbds:                     Not affected
Vulnerability Tsa:                       Not affected
Vulnerability Tsx async abort:           Not affected
Vulnerability Vmscape:                   Not affected
Details

group                                                 main                                    push-down-topk-through-join
-----                                                 ----                                    ---------------------------
logical_aggregate_with_join                           1.00    418.7±2.32µs        ? ?/sec     1.00    419.7±1.40µs        ? ?/sec
logical_correlated_subquery_exists                    1.00    251.8±2.04µs        ? ?/sec     1.02    255.6±0.71µs        ? ?/sec
logical_correlated_subquery_in                        1.00    251.9±1.76µs        ? ?/sec     1.00    252.7±0.77µs        ? ?/sec
logical_distinct_many_columns                         1.01    318.8±3.23µs        ? ?/sec     1.00    316.7±0.83µs        ? ?/sec
logical_join_4_with_agg_and_filter                    1.00    234.0±1.02µs        ? ?/sec     1.02    237.5±0.93µs        ? ?/sec
logical_join_8_with_agg_sort_limit                    1.00    403.8±2.23µs        ? ?/sec     1.01    409.4±1.25µs        ? ?/sec
logical_join_chain_16                                 1.00    657.5±3.15µs        ? ?/sec     1.01    661.3±3.15µs        ? ?/sec
logical_join_chain_4                                  1.00    114.9±0.38µs        ? ?/sec     1.02    117.2±0.93µs        ? ?/sec
logical_join_chain_8                                  1.00    241.0±0.81µs        ? ?/sec     1.01    243.9±0.99µs        ? ?/sec
logical_multiple_subqueries                           1.00    452.4±2.07µs        ? ?/sec     1.01    455.2±1.64µs        ? ?/sec
logical_nested_cte_4_levels                           1.00    237.5±0.95µs        ? ?/sec     1.02    241.3±1.06µs        ? ?/sec
logical_plan_struct_join_agg_sort                     1.00    168.5±0.87µs        ? ?/sec     1.02    171.7±1.11µs        ? ?/sec
logical_plan_tpcds_all                                1.00     73.9±0.80ms        ? ?/sec     1.01     74.4±0.50ms        ? ?/sec
logical_plan_tpch_all                                 1.00      5.7±0.04ms        ? ?/sec     1.04      5.9±0.04ms        ? ?/sec
logical_scalar_subquery                               1.00    269.7±1.31µs        ? ?/sec     1.02    275.5±0.93µs        ? ?/sec
logical_select_all_from_1000                          1.06      8.5±0.03ms        ? ?/sec     1.00      8.0±0.02ms        ? ?/sec
logical_select_one_from_700                           1.00    274.2±2.71µs        ? ?/sec     1.01    276.0±2.51µs        ? ?/sec
logical_trivial_join_high_numbered_columns            1.00    254.6±1.07µs        ? ?/sec     1.01    256.8±1.09µs        ? ?/sec
logical_trivial_join_low_numbered_columns             1.00    242.8±1.08µs        ? ?/sec     1.01    244.2±1.02µs        ? ?/sec
logical_union_4_branches                              1.00    356.5±1.74µs        ? ?/sec     1.01    358.6±1.04µs        ? ?/sec
logical_union_8_branches                              1.00    679.6±2.94µs        ? ?/sec     1.00    681.0±1.93µs        ? ?/sec
logical_wide_aggregate_100_exprs                      1.01      3.8±0.02ms        ? ?/sec     1.00      3.7±0.02ms        ? ?/sec
logical_wide_case_50_exprs                            1.00   1664.4±8.66µs        ? ?/sec     1.00   1668.2±4.96µs        ? ?/sec
logical_wide_filter_200_predicates                    1.00   1282.5±7.38µs        ? ?/sec     1.00   1288.1±7.23µs        ? ?/sec
logical_wide_filter_50_predicates                     1.00    370.3±2.09µs        ? ?/sec     1.01    374.6±2.21µs        ? ?/sec
optimizer_correlated_exists                           1.00    251.2±0.68µs        ? ?/sec     1.01    253.0±0.98µs        ? ?/sec
optimizer_join_4_with_agg_filter                      1.01    425.2±2.29µs        ? ?/sec     1.00    420.7±1.92µs        ? ?/sec
optimizer_join_chain_4                                1.00    177.7±0.41µs        ? ?/sec     1.00    177.9±0.30µs        ? ?/sec
optimizer_join_chain_8                                1.01    558.4±1.41µs        ? ?/sec     1.00    555.0±1.61µs        ? ?/sec
optimizer_select_all_from_1000                        1.00      4.6±0.00ms        ? ?/sec     1.01      4.6±0.03ms        ? ?/sec
optimizer_select_one_from_700                         1.00    255.0±0.50µs        ? ?/sec     1.00    254.9±0.77µs        ? ?/sec
optimizer_tpcds_all                                   1.00    253.6±3.05ms        ? ?/sec     1.00    254.8±2.68ms        ? ?/sec
optimizer_tpch_all                                    1.00     14.1±0.03ms        ? ?/sec     1.01     14.2±0.04ms        ? ?/sec
optimizer_wide_aggregate_100                          1.00      2.1±0.01ms        ? ?/sec     1.00      2.1±0.00ms        ? ?/sec
optimizer_wide_filter_200                             1.00      3.4±0.02ms        ? ?/sec     1.00      3.4±0.05ms        ? ?/sec
physical_intersection                                 1.00    554.8±2.19µs        ? ?/sec     1.00    557.4±7.24µs        ? ?/sec
physical_join_consider_sort                           1.01    991.4±3.27µs        ? ?/sec     1.00    978.9±2.49µs        ? ?/sec
physical_join_distinct                                1.00    236.5±1.15µs        ? ?/sec     1.01    238.1±0.95µs        ? ?/sec
physical_many_self_joins                              1.02      7.7±0.11ms        ? ?/sec     1.00      7.5±0.02ms        ? ?/sec
physical_plan_clickbench_all                          1.00    119.9±3.99ms        ? ?/sec     1.01    120.7±5.26ms        ? ?/sec
physical_plan_clickbench_q1                           1.03  1340.3±10.45µs        ? ?/sec     1.00  1301.3±22.08µs        ? ?/sec
physical_plan_clickbench_q10                          1.03   1938.7±7.67µs        ? ?/sec     1.00   1887.1±8.38µs        ? ?/sec
physical_plan_clickbench_q11                          1.03      2.3±0.06ms        ? ?/sec     1.00      2.2±0.04ms        ? ?/sec
physical_plan_clickbench_q12                          1.00      2.2±0.10ms        ? ?/sec     1.01      2.3±0.13ms        ? ?/sec
physical_plan_clickbench_q13                          1.04      2.0±0.13ms        ? ?/sec     1.00  1962.2±106.17µs        ? ?/sec
physical_plan_clickbench_q14                          1.04      2.2±0.17ms        ? ?/sec     1.00      2.1±0.08ms        ? ?/sec
physical_plan_clickbench_q15                          1.01   1975.1±5.34µs        ? ?/sec     1.00   1960.1±6.42µs        ? ?/sec
physical_plan_clickbench_q16                          1.03  1711.4±19.47µs        ? ?/sec     1.00  1664.8±29.81µs        ? ?/sec
physical_plan_clickbench_q17                          1.00  1725.5±42.85µs        ? ?/sec     1.03  1772.4±41.09µs        ? ?/sec
physical_plan_clickbench_q18                          1.02  1599.8±60.75µs        ? ?/sec     1.00  1568.7±46.18µs        ? ?/sec
physical_plan_clickbench_q19                          1.02  1946.3±102.36µs        ? ?/sec    1.00  1911.4±49.20µs        ? ?/sec
physical_plan_clickbench_q2                           1.04  1765.5±92.51µs        ? ?/sec     1.00  1705.0±55.94µs        ? ?/sec
physical_plan_clickbench_q20                          1.01   1468.9±5.41µs        ? ?/sec     1.00   1452.7±6.83µs        ? ?/sec
physical_plan_clickbench_q21                          1.03  1718.5±50.60µs        ? ?/sec     1.00  1671.1±27.86µs        ? ?/sec
physical_plan_clickbench_q22                          1.01      2.2±0.12ms        ? ?/sec     1.00      2.1±0.17ms        ? ?/sec
physical_plan_clickbench_q23                          1.07      2.3±0.17ms        ? ?/sec     1.00      2.2±0.01ms        ? ?/sec
physical_plan_clickbench_q24                          1.03      5.6±0.25ms        ? ?/sec     1.00      5.4±0.05ms        ? ?/sec
physical_plan_clickbench_q25                          1.00   1809.8±6.48µs        ? ?/sec     1.01  1829.2±61.61µs        ? ?/sec
physical_plan_clickbench_q26                          1.02  1684.7±42.45µs        ? ?/sec     1.00   1648.5±5.41µs        ? ?/sec
physical_plan_clickbench_q27                          1.01  1936.8±91.44µs        ? ?/sec     1.00  1909.9±39.10µs        ? ?/sec
physical_plan_clickbench_q28                          1.06      2.4±0.17ms        ? ?/sec     1.00      2.2±0.03ms        ? ?/sec
physical_plan_clickbench_q29                          1.03      2.6±0.20ms        ? ?/sec     1.00      2.5±0.14ms        ? ?/sec
physical_plan_clickbench_q3                           1.04  1608.8±73.52µs        ? ?/sec     1.00  1550.9±52.40µs        ? ?/sec
physical_plan_clickbench_q30                          1.00     14.6±0.07ms        ? ?/sec     1.01     14.7±0.35ms        ? ?/sec
physical_plan_clickbench_q31                          1.03      2.3±0.08ms        ? ?/sec     1.00      2.3±0.01ms        ? ?/sec
physical_plan_clickbench_q32                          1.04      2.4±0.08ms        ? ?/sec     1.00      2.3±0.07ms        ? ?/sec
physical_plan_clickbench_q33                          1.00  1884.1±18.72µs        ? ?/sec     1.03  1947.9±57.12µs        ? ?/sec
physical_plan_clickbench_q34                          1.01  1721.9±95.72µs        ? ?/sec     1.00  1707.9±133.03µs        ? ?/sec
physical_plan_clickbench_q35                          1.03  1812.6±85.59µs        ? ?/sec     1.00  1755.4±76.40µs        ? ?/sec
physical_plan_clickbench_q36                          1.02      2.0±0.01ms        ? ?/sec     1.00  1998.2±11.04µs        ? ?/sec
physical_plan_clickbench_q37                          1.06      2.5±0.10ms        ? ?/sec     1.00      2.4±0.09ms        ? ?/sec
physical_plan_clickbench_q38                          1.05      2.6±0.18ms        ? ?/sec     1.00      2.5±0.09ms        ? ?/sec
physical_plan_clickbench_q39                          1.09      2.6±0.22ms        ? ?/sec     1.00      2.4±0.01ms        ? ?/sec
physical_plan_clickbench_q4                           1.05  1440.0±50.13µs        ? ?/sec     1.00  1372.0±37.96µs        ? ?/sec
physical_plan_clickbench_q40                          1.00      3.2±0.01ms        ? ?/sec     1.05      3.4±0.25ms        ? ?/sec
physical_plan_clickbench_q41                          1.00      2.7±0.01ms        ? ?/sec     1.04      2.8±0.20ms        ? ?/sec
physical_plan_clickbench_q42                          1.02      2.9±0.01ms        ? ?/sec     1.00      2.9±0.01ms        ? ?/sec
physical_plan_clickbench_q43                          1.05      3.2±0.15ms        ? ?/sec     1.00      3.1±0.12ms        ? ?/sec
physical_plan_clickbench_q44                          1.04  1540.8±37.75µs        ? ?/sec     1.00  1487.1±18.46µs        ? ?/sec
physical_plan_clickbench_q45                          1.01  1487.0±35.32µs        ? ?/sec     1.00  1465.7±26.46µs        ? ?/sec
physical_plan_clickbench_q46                          1.00   1781.3±6.44µs        ? ?/sec     1.01  1793.6±61.64µs        ? ?/sec
physical_plan_clickbench_q47                          1.01      2.5±0.01ms        ? ?/sec     1.00      2.5±0.07ms        ? ?/sec
physical_plan_clickbench_q48                          1.03      2.7±0.10ms        ? ?/sec     1.00      2.6±0.01ms        ? ?/sec
physical_plan_clickbench_q49                          1.01      2.9±0.15ms        ? ?/sec     1.00      2.9±0.06ms        ? ?/sec
physical_plan_clickbench_q5                           1.04   1545.3±5.66µs        ? ?/sec     1.00   1488.2±7.00µs        ? ?/sec
physical_plan_clickbench_q50                          1.00      2.6±0.01ms        ? ?/sec     1.10      2.8±0.20ms        ? ?/sec
physical_plan_clickbench_q51                          1.00  1886.6±68.21µs        ? ?/sec     1.01  1900.9±120.35µs        ? ?/sec
physical_plan_clickbench_q6                           1.06  1590.5±15.61µs        ? ?/sec     1.00  1505.0±18.01µs        ? ?/sec
physical_plan_clickbench_q7                           1.02  1381.6±17.23µs        ? ?/sec     1.00  1360.9±25.74µs        ? ?/sec
physical_plan_clickbench_q8                           1.04  1939.7±83.50µs        ? ?/sec     1.00  1861.9±70.33µs        ? ?/sec
physical_plan_clickbench_q9                           1.04  1869.7±90.59µs        ? ?/sec     1.00  1803.0±57.99µs        ? ?/sec
physical_plan_struct_join_agg_sort                    1.00   1224.0±3.14µs        ? ?/sec     1.00  1224.5±14.31µs        ? ?/sec
physical_plan_tpcds_all                               1.02   664.6±14.14ms        ? ?/sec     1.00   652.1±13.51ms        ? ?/sec
physical_plan_tpch_all                                1.02     43.8±1.20ms        ? ?/sec     1.00     43.0±1.12ms        ? ?/sec
physical_plan_tpch_q1                                 1.00   1402.3±2.41µs        ? ?/sec     1.01  1415.0±13.18µs        ? ?/sec
physical_plan_tpch_q10                                1.04      2.8±0.09ms        ? ?/sec     1.00      2.7±0.00ms        ? ?/sec
physical_plan_tpch_q11                                1.00      2.1±0.00ms        ? ?/sec     1.00      2.1±0.00ms        ? ?/sec
physical_plan_tpch_q12                                1.00   1118.1±2.59µs        ? ?/sec     1.01  1127.1±12.12µs        ? ?/sec
physical_plan_tpch_q13                                1.00    879.0±1.33µs        ? ?/sec     1.01    884.2±1.64µs        ? ?/sec
physical_plan_tpch_q14                                1.03   1343.6±8.67µs        ? ?/sec     1.00   1304.8±5.33µs        ? ?/sec
physical_plan_tpch_q16                                1.02  1470.5±10.57µs        ? ?/sec     1.00   1447.7±5.35µs        ? ?/sec
physical_plan_tpch_q17                                1.02   1636.8±2.10µs        ? ?/sec     1.00  1608.4±16.49µs        ? ?/sec
physical_plan_tpch_q18                                1.01   1732.8±2.98µs        ? ?/sec     1.00  1710.4±19.83µs        ? ?/sec
physical_plan_tpch_q19                                1.02   1616.3±7.23µs        ? ?/sec     1.00   1578.1±2.85µs        ? ?/sec
physical_plan_tpch_q2                                 1.02      4.0±0.07ms        ? ?/sec     1.00      3.9±0.02ms        ? ?/sec
physical_plan_tpch_q20                                1.03      2.2±0.03ms        ? ?/sec     1.00      2.1±0.02ms        ? ?/sec
physical_plan_tpch_q21                                1.02      2.9±0.05ms        ? ?/sec     1.00      2.8±0.05ms        ? ?/sec
physical_plan_tpch_q22                                1.04  1531.0±26.44µs        ? ?/sec     1.00  1478.3±19.05µs        ? ?/sec
physical_plan_tpch_q3                                 1.01  1811.5±16.77µs        ? ?/sec     1.00  1801.1±18.59µs        ? ?/sec
physical_plan_tpch_q4                                 1.00   1082.2±2.85µs        ? ?/sec     1.01   1087.7±8.34µs        ? ?/sec
physical_plan_tpch_q5                                 1.00      2.4±0.00ms        ? ?/sec     1.02      2.5±0.07ms        ? ?/sec
physical_plan_tpch_q6                                 1.00    592.9±1.34µs        ? ?/sec     1.01    597.5±2.02µs        ? ?/sec
physical_plan_tpch_q7                                 1.01      2.9±0.01ms        ? ?/sec     1.00      2.9±0.00ms        ? ?/sec
physical_plan_tpch_q8                                 1.04      4.1±0.03ms        ? ?/sec     1.00      3.9±0.08ms        ? ?/sec
physical_plan_tpch_q9                                 1.00      2.7±0.04ms        ? ?/sec     1.01      2.8±0.08ms        ? ?/sec
physical_select_aggregates_from_200                   1.00     13.3±0.05ms        ? ?/sec     1.00     13.3±0.02ms        ? ?/sec
physical_select_all_from_1000                         1.03     17.5±0.08ms        ? ?/sec     1.00     16.9±0.11ms        ? ?/sec
physical_select_one_from_700                          1.01    712.7±1.79µs        ? ?/sec     1.00    708.9±2.45µs        ? ?/sec
physical_sorted_union_order_by_10_int64               1.00      4.2±0.04ms        ? ?/sec     1.02      4.3±0.10ms        ? ?/sec
physical_sorted_union_order_by_10_uint64              1.00      8.3±0.02ms        ? ?/sec     1.00      8.2±0.03ms        ? ?/sec
physical_sorted_union_order_by_50_int64               1.00    104.4±1.60ms        ? ?/sec     1.00    104.6±1.64ms        ? ?/sec
physical_sorted_union_order_by_50_uint64              1.00    362.6±6.20ms        ? ?/sec     1.00    361.3±5.72ms        ? ?/sec
physical_theta_join_consider_sort                     1.00   1022.4±4.99µs        ? ?/sec     1.00   1019.4±5.55µs        ? ?/sec
physical_unnest_to_join                               1.00    630.9±2.00µs        ? ?/sec     1.00    630.2±2.78µs        ? ?/sec
physical_window_function_partition_by_12_on_values    1.00    704.6±2.17µs        ? ?/sec     1.00    704.7±1.59µs        ? ?/sec
physical_window_function_partition_by_30_on_values    1.01   1398.5±2.81µs        ? ?/sec     1.00   1390.4±2.67µs        ? ?/sec
physical_window_function_partition_by_4_on_values     1.01    436.6±1.24µs        ? ?/sec     1.00    433.4±3.17µs        ? ?/sec
physical_window_function_partition_by_7_on_values     1.00    528.7±2.35µs        ? ?/sec     1.00   527.7±13.60µs        ? ?/sec
physical_window_function_partition_by_8_on_values     1.00    566.4±1.92µs        ? ?/sec     1.00    565.7±2.08µs        ? ?/sec
with_param_values_many_columns                        1.01    433.6±2.34µs        ? ?/sec     1.00    428.5±1.99µs        ? ?/sec

Resource Usage

base (merge-base)

Metric Value
Wall time 1555.3s
Peak memory 19.6 GiB
Avg memory 19.6 GiB
CPU user 1867.6s
CPU sys 1.9s
Peak spill 0 B

branch

Metric Value
Wall time 1555.3s
Peak memory 19.6 GiB
Avg memory 19.6 GiB
CPU user 1868.6s
CPU sys 1.5s
Peak spill 0 B

File an issue against this benchmark runner

@SubhamSinghal

Copy link
Copy Markdown
Contributor Author

@kumarUjjawal @gene-bordegaray What needs to be done here?

@kumarUjjawal

Copy link
Copy Markdown
Contributor

@SubhamSinghal there are some regression in the performance for some of the queries, can you investigate. See #21621 (comment)

@gene-bordegaray

Copy link
Copy Markdown
Contributor

I think the planning overhead is somewhat inevitable since it is an additional pass. Could we run benches or could you show the improvements of this on targeted queries

@alamb

alamb commented May 26, 2026

Copy link
Copy Markdown
Contributor

I think the planning overhead is somewhat inevitable since it is an additional pass. Could we run benches or could you show the improvements of this on targeted queries

Or perhaps add this feature to one of the existing passes (like pushdown sort 🤔 )

@SubhamSinghal

Copy link
Copy Markdown
Contributor Author

Could we run benches or could you show the improvements of this on targeted queries

@gene-bordegaray Added benchmark SQL query, let me know if this is what you were expecting.

@gene-bordegaray

Copy link
Copy Markdown
Contributor

could you look into if this can be part of the existing sort pushdown rule like @alamb suggested, maybe repursposing the rule to be more general ratehr than another pass of the plan?

@SubhamSinghal

Copy link
Copy Markdown
Contributor Author

Or perhaps add this feature to one of the existing passes (like pushdown sort 🤔 )

@alamb @gene-bordegaray I have merged pushDownTopK rule with pushDownLimit, let me know if changes are ok.

@alamb

alamb commented Jun 4, 2026

Copy link
Copy Markdown
Contributor

I am not likely going to have time to review this PR anytime soon unfortunely. I have many other PRs to review and I am not focusing on join optimizations at this time unfortnately

@adriangb

adriangb commented Jun 4, 2026

Copy link
Copy Markdown
Contributor

I can help review this next week.

@SubhamSinghal as a first step I've split out the benchmarks + SLT tests into #22760. This will allow us to (1) make this PR smaller and (2) review the plan changes and benchmark changes as a diff instead of an addition.

@adriangb adriangb self-assigned this Jun 4, 2026
@alamb

alamb commented Jun 4, 2026

Copy link
Copy Markdown
Contributor

Also FYI @Dandandan given you filed

pull Bot pushed a commit to TCeason/arrow-datafusion that referenced this pull request Jun 4, 2026
…e#22760)

## Which issue does this PR close?

- Relates to apache#11900

## Rationale for this change

This splits the test and benchmark scaffolding out of apache#21621 so the
`PushDownTopKThroughJoin` optimizer rule itself can be reviewed in
isolation, with a small, focused diff.

The benchmark and SLT files here do not depend on the rule. They are
committed first so that:

1. The benchmark can measure the rule's effect against a baseline that
   does not register it.
2. The follow-up rule PR's diff shows exactly which plans change, since
   the EXPLAIN plans here capture the current (pre-rule) behavior.

## What changes are included in this PR?

- A `push_down_topk` benchmark (`dfbench push-down-topk`) that runs
  `ORDER BY <cols> LIMIT N` queries over outer joins against TPC-H
  `customer`/`orders`/`nation`, plus its query files under
  `benchmarks/queries/push_down_topk/`.
- `push_down_topk_through_join.slt` covering the scenarios the rule
  handles: preserved-side sort keys, ineligible join types
  (inner/full/semi/anti), `ON`-clause filters, projection and
  `SubqueryAlias` resolution, existing child sorts, ties, multi-level
  joins, `OFFSET`, and volatile expressions.

The EXPLAIN plans assert current behavior (TopK not yet pushed through
the join). The follow-up PR that adds the rule updates those plans in
place; the query-result checks hold regardless of whether the rule is
enabled.

The new optimizer rule, the `push_down_limit.rs` changes, and the
`optimizer_rule_reference.md` update from apache#21621 are intentionally left
for the follow-up PR.

## Are these changes tested?

Yes — this PR is the tests. `push_down_topk_through_join.slt` passes
against `main`, and the benchmark binary compiles and runs.

## Are there any user-facing changes?

No. No API changes; only new benchmark and test files plus benchmark CLI
wiring.

---------

Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate optimizer Optimizer rules sqllogictest SQL Logic Tests (.slt)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Push down TopK below Join