From 173b53ca9710dc0eac27700f43102fb16435f1ec Mon Sep 17 00:00:00 2001 From: SubhamSinghal Date: Tue, 30 Jun 2026 22:22:13 +0530 Subject: [PATCH 1/3] Extend WindowTopN to support RANK --- .../sqllogictest/test_files/window_topn.slt | 482 ++++++++++++++++-- 1 file changed, 451 insertions(+), 31 deletions(-) diff --git a/datafusion/sqllogictest/test_files/window_topn.slt b/datafusion/sqllogictest/test_files/window_topn.slt index bf9ce26b35537..2eb72f519b267 100644 --- a/datafusion/sqllogictest/test_files/window_topn.slt +++ b/datafusion/sqllogictest/test_files/window_topn.slt @@ -64,7 +64,7 @@ logical_plan physical_plan 01)ProjectionExec: expr=[id@0 as id, pk@1 as pk, val@2 as val, row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as rn] 02)--BoundedWindowAggExec: wdw=[row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] -03)----PartitionedTopKExec: fetch=3, partition=[pk@1], order=[val@2 ASC NULLS LAST] +03)----PartitionedTopKExec: fn=row_number, fetch=3, partition=[pk@1], order=[val@2 ASC NULLS LAST] 04)------DataSourceExec: partitions=1, partition_sizes=[1] # Test 3: rn < 4 should give same results (fetch=3) @@ -131,7 +131,7 @@ logical_plan physical_plan 01)ProjectionExec: expr=[id@0 as id, pk@1 as pk, val@2 as val, row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as rn] 02)--BoundedWindowAggExec: wdw=[row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] -03)----PartitionedTopKExec: fetch=3, partition=[pk@1], order=[val@2 ASC NULLS LAST] +03)----PartitionedTopKExec: fn=row_number, fetch=3, partition=[pk@1], order=[val@2 ASC NULLS LAST] 04)------DataSourceExec: partitions=1, partition_sizes=[1] # Test 7: Filter on data column (not window output) — should NOT optimize @@ -164,7 +164,7 @@ EXPLAIN SELECT * FROM ( physical_plan 01)ProjectionExec: expr=[id@0 as id, pk@1 as pk, val@2 as val, row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as rn] 02)--BoundedWindowAggExec: wdw=[row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] -03)----PartitionedTopKExec: fetch=3, partition=[pk@1], order=[val@2 ASC NULLS LAST] +03)----PartitionedTopKExec: fn=row_number, fetch=3, partition=[pk@1], order=[val@2 ASC NULLS LAST] 04)------DataSourceExec: partitions=1, partition_sizes=[1] statement ok @@ -236,19 +236,20 @@ physical_plan 33)│ PartitionedTopKExec │ 34)│ -------------------- │ 35)│ fetch: 3 │ -36)│ │ -37)│ order: │ -38)│ [val@2 ASC NULLS LAST] │ -39)│ │ -40)│ partition: [pk@1] │ -41)└─────────────┬─────────────┘ -42)┌─────────────┴─────────────┐ -43)│ DataSourceExec │ -44)│ -------------------- │ -45)│ bytes: 480 │ -46)│ format: memory │ -47)│ rows: 1 │ -48)└───────────────────────────┘ +36)│ fn: row_number │ +37)│ │ +38)│ order: │ +39)│ [val@2 ASC NULLS LAST] │ +40)│ │ +41)│ partition: [pk@1] │ +42)└─────────────┬─────────────┘ +43)┌─────────────┴─────────────┐ +44)│ DataSourceExec │ +45)│ -------------------- │ +46)│ bytes: 480 │ +47)│ format: memory │ +48)│ rows: 1 │ +49)└───────────────────────────┘ statement ok SET datafusion.explain.format = indent; @@ -308,10 +309,9 @@ EXPLAIN SELECT * FROM ( ---- physical_plan 01)ProjectionExec: expr=[id@0 as id, pk@1 as pk, val@2 as val, row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as rn, rank() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as rnk] -02)--FilterExec: rank() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 <= 3 -03)----BoundedWindowAggExec: wdw=[row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, rank() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "rank() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] -04)------SortExec: expr=[pk@1 ASC NULLS LAST, val@2 ASC NULLS LAST], preserve_partitioning=[false] -05)--------DataSourceExec: partitions=1, partition_sizes=[1] +02)--BoundedWindowAggExec: wdw=[row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, rank() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "rank() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] +03)----PartitionedTopKExec: fn=rank, fetch=3, partition=[pk@1], order=[val@2 ASC NULLS LAST] +04)------DataSourceExec: partitions=1, partition_sizes=[1] # Test 14: Filter on rn AND rnk — compound predicate should NOT optimize query TT @@ -360,7 +360,7 @@ EXPLAIN SELECT * FROM ( physical_plan 01)ProjectionExec: expr=[id@0 as id, pk@1 as pk, val@2 as val, row_number() PARTITION BY [window_topn_t.pk, window_topn_t.id] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as rn] 02)--BoundedWindowAggExec: wdw=[row_number() PARTITION BY [window_topn_t.pk, window_topn_t.id] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() PARTITION BY [window_topn_t.pk, window_topn_t.id] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] -03)----PartitionedTopKExec: fetch=3, partition=[pk@1, id@0], order=[val@2 ASC NULLS LAST] +03)----PartitionedTopKExec: fn=row_number, fetch=3, partition=[pk@1, id@0], order=[val@2 ASC NULLS LAST] 04)------DataSourceExec: partitions=1, partition_sizes=[1] statement ok @@ -391,7 +391,7 @@ EXPLAIN SELECT * FROM ( physical_plan 01)ProjectionExec: expr=[id@0 as id, pk@1 as pk, val@2 as val, row_number() PARTITION BY [window_topn_t.id] ORDER BY [window_topn_t.id ASC NULLS LAST, window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as rn] 02)--BoundedWindowAggExec: wdw=[row_number() PARTITION BY [window_topn_t.id] ORDER BY [window_topn_t.id ASC NULLS LAST, window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() PARTITION BY [window_topn_t.id] ORDER BY [window_topn_t.id ASC NULLS LAST, window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] -03)----PartitionedTopKExec: fetch=3, partition=[id@0], order=[val@2 ASC NULLS LAST] +03)----PartitionedTopKExec: fn=row_number, fetch=3, partition=[id@0], order=[val@2 ASC NULLS LAST] 04)------DataSourceExec: partitions=1, partition_sizes=[1] # Test 19: Overlapping keys correctness (each id is unique, so rn=1 for all) @@ -426,7 +426,7 @@ EXPLAIN SELECT * FROM ( physical_plan 01)ProjectionExec: expr=[id@0 as id, pk@1 as pk, val@2 as val, row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.pk ASC NULLS LAST, window_topn_t.val DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as rn] 02)--BoundedWindowAggExec: wdw=[row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.pk ASC NULLS LAST, window_topn_t.val DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.pk ASC NULLS LAST, window_topn_t.val DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] -03)----PartitionedTopKExec: fetch=3, partition=[pk@1], order=[val@2 DESC] +03)----PartitionedTopKExec: fn=row_number, fetch=3, partition=[pk@1], order=[val@2 DESC] 04)------DataSourceExec: partitions=1, partition_sizes=[1] # Test 21: Correctness for PARTITION BY pk ORDER BY pk, val DESC @@ -460,7 +460,7 @@ EXPLAIN SELECT * FROM ( physical_plan 01)ProjectionExec: expr=[id@0 as id, pk@1 as pk, val@2 as val, row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.pk DESC NULLS FIRST, window_topn_t.val DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as rn] 02)--BoundedWindowAggExec: wdw=[row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.pk DESC NULLS FIRST, window_topn_t.val DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.pk DESC NULLS FIRST, window_topn_t.val DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] -03)----PartitionedTopKExec: fetch=3, partition=[pk@1], order=[val@2 DESC] +03)----PartitionedTopKExec: fn=row_number, fetch=3, partition=[pk@1], order=[val@2 DESC] 04)------DataSourceExec: partitions=1, partition_sizes=[1] statement ok @@ -494,7 +494,7 @@ QUALIFY rn <= 3; physical_plan 01)ProjectionExec: expr=[id@0 as id, pk@1 as pk, val@2 as val, row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as rn] 02)--BoundedWindowAggExec: wdw=[row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] -03)----PartitionedTopKExec: fetch=3, partition=[pk@1], order=[val@2 ASC NULLS LAST] +03)----PartitionedTopKExec: fn=row_number, fetch=3, partition=[pk@1], order=[val@2 ASC NULLS LAST] 04)------DataSourceExec: partitions=1, partition_sizes=[1] # Test 30: QUALIFY with < operator @@ -522,10 +522,9 @@ QUALIFY rnk <= 3; ---- physical_plan 01)ProjectionExec: expr=[id@0 as id, pk@1 as pk, val@2 as val, rank() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as rnk] -02)--FilterExec: rank() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 <= 3 -03)----BoundedWindowAggExec: wdw=[rank() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "rank() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] -04)------SortExec: expr=[pk@1 ASC NULLS LAST, val@2 ASC NULLS LAST], preserve_partitioning=[false] -05)--------DataSourceExec: partitions=1, partition_sizes=[1] +02)--BoundedWindowAggExec: wdw=[rank() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "rank() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] +03)----PartitionedTopKExec: fn=rank, fetch=3, partition=[pk@1], order=[val@2 ASC NULLS LAST] +04)------DataSourceExec: partitions=1, partition_sizes=[1] statement ok SET datafusion.explain.physical_plan_only = false; @@ -601,7 +600,7 @@ EXPLAIN SELECT * FROM ( physical_plan 01)ProjectionExec: expr=[id@0 as id, pk@1 as pk, val@2 as val, row_number() PARTITION BY [window_topn_nulls.pk] ORDER BY [window_topn_nulls.val ASC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as rn] 02)--BoundedWindowAggExec: wdw=[row_number() PARTITION BY [window_topn_nulls.pk] ORDER BY [window_topn_nulls.val ASC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() PARTITION BY [window_topn_nulls.pk] ORDER BY [window_topn_nulls.val ASC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] -03)----PartitionedTopKExec: fetch=2, partition=[pk@1], order=[val@2 ASC] +03)----PartitionedTopKExec: fn=row_number, fetch=2, partition=[pk@1], order=[val@2 ASC] 04)------DataSourceExec: partitions=1, partition_sizes=[1] query TT @@ -612,7 +611,7 @@ EXPLAIN SELECT * FROM ( physical_plan 01)ProjectionExec: expr=[id@0 as id, pk@1 as pk, val@2 as val, row_number() PARTITION BY [window_topn_nulls.pk] ORDER BY [window_topn_nulls.val DESC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as rn] 02)--BoundedWindowAggExec: wdw=[row_number() PARTITION BY [window_topn_nulls.pk] ORDER BY [window_topn_nulls.val DESC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() PARTITION BY [window_topn_nulls.pk] ORDER BY [window_topn_nulls.val DESC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] -03)----PartitionedTopKExec: fetch=2, partition=[pk@1], order=[val@2 DESC NULLS LAST] +03)----PartitionedTopKExec: fn=row_number, fetch=2, partition=[pk@1], order=[val@2 DESC NULLS LAST] 04)------DataSourceExec: partitions=1, partition_sizes=[1] statement ok @@ -621,6 +620,427 @@ SET datafusion.explain.physical_plan_only = false; statement ok DROP TABLE window_topn_nulls; +############################################################################### +# RANK() tests +############################################################################### +# +# RANK semantics differ from ROW_NUMBER in that ties at the boundary are +# retained (`WHERE rk <= K` may keep more than K rows per partition). The +# tests below exercise both the boundary-Equal case (incoming row tied +# with current K-th-best) and the boundary-unchanged-after-eviction case +# (PartitionedTopKRank: heap evicts a tied row → push to per-partition +# `ties` Vec). + +# Table designed to produce ties at and around the rank-K boundary +statement ok +CREATE TABLE window_topn_rank_t (id INT, pk INT, val INT) AS VALUES + -- pk=1: ties at rank 2 (val=20 thrice), val=30 jumps to rank 5 + (1, 1, 10), + (2, 1, 20), + (3, 1, 20), + (4, 1, 20), + (5, 1, 30), + -- pk=2: distinct values, no ties + (6, 2, 5), + (7, 2, 15), + (8, 2, 25), + -- pk=3: 100 then four 200s — exercises the boundary-unchanged-with-eviction + -- case from the design doc's worked example (heap fills with three 200s, + -- the fourth ties, then 100 evicts a 200 but new boundary is still 200, + -- so the evicted 200 must move to ties) + (9, 3, 100), + (10, 3, 200), + (11, 3, 200), + (12, 3, 200), + (13, 3, 200), + (14, 3, 300); + +# Test R1: Basic RANK correctness with ties at the boundary. +# Expected per partition (RANK ASC, rk <= 3): +# pk=1: 10 (rk=1), 20×3 (rk=2 each) → 4 rows +# pk=2: 5 (rk=1), 15 (rk=2), 25 (rk=3) → 3 rows +# pk=3: 100 (rk=1), 200×4 (rk=2 each) → 5 rows +# Total: 12 rows kept, val=30 (pk=1, rk=5) and val=300 (pk=3, rk=6) dropped. +query III rowsort +SELECT id, pk, val FROM ( + SELECT *, RANK() OVER (PARTITION BY pk ORDER BY val) as rk FROM window_topn_rank_t +) WHERE rk <= 3; +---- +1 1 10 +10 3 200 +11 3 200 +12 3 200 +13 3 200 +2 1 20 +3 1 20 +4 1 20 +6 2 5 +7 2 15 +8 2 25 +9 3 100 + +# Test R2: EXPLAIN shows PartitionedTopKExec with fn=rank +query TT +EXPLAIN SELECT * FROM ( + SELECT *, RANK() OVER (PARTITION BY pk ORDER BY val) as rk FROM window_topn_rank_t +) WHERE rk <= 3; +---- +logical_plan +01)Projection: window_topn_rank_t.id, window_topn_rank_t.pk, window_topn_rank_t.val, rank() PARTITION BY [window_topn_rank_t.pk] ORDER BY [window_topn_rank_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rk +02)--Filter: rank() PARTITION BY [window_topn_rank_t.pk] ORDER BY [window_topn_rank_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW <= UInt64(3) +03)----WindowAggr: windowExpr=[[rank() PARTITION BY [window_topn_rank_t.pk] ORDER BY [window_topn_rank_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] +04)------TableScan: window_topn_rank_t projection=[id, pk, val] +physical_plan +01)ProjectionExec: expr=[id@0 as id, pk@1 as pk, val@2 as val, rank() PARTITION BY [window_topn_rank_t.pk] ORDER BY [window_topn_rank_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as rk] +02)--BoundedWindowAggExec: wdw=[rank() PARTITION BY [window_topn_rank_t.pk] ORDER BY [window_topn_rank_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "rank() PARTITION BY [window_topn_rank_t.pk] ORDER BY [window_topn_rank_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] +03)----PartitionedTopKExec: fn=rank, fetch=3, partition=[pk@1], order=[val@2 ASC NULLS LAST] +04)------DataSourceExec: partitions=1, partition_sizes=[1] + +# Test R3: rk < 4 should give the same results (fetch = K-1 = 3) +query III rowsort +SELECT id, pk, val FROM ( + SELECT *, RANK() OVER (PARTITION BY pk ORDER BY val) as rk FROM window_topn_rank_t +) WHERE rk < 4; +---- +1 1 10 +10 3 200 +11 3 200 +12 3 200 +13 3 200 +2 1 20 +3 1 20 +4 1 20 +6 2 5 +7 2 15 +8 2 25 +9 3 100 + +# Test R4: Flipped predicate `3 >= rk` should also trigger optimization +query III rowsort +SELECT id, pk, val FROM ( + SELECT *, RANK() OVER (PARTITION BY pk ORDER BY val) as rk FROM window_topn_rank_t +) WHERE 3 >= rk; +---- +1 1 10 +10 3 200 +11 3 200 +12 3 200 +13 3 200 +2 1 20 +3 1 20 +4 1 20 +6 2 5 +7 2 15 +8 2 25 +9 3 100 + +# Test R5: Flipped strict `4 > rk` should also trigger optimization (fetch=3) +query III rowsort +SELECT id, pk, val FROM ( + SELECT *, RANK() OVER (PARTITION BY pk ORDER BY val) as rk FROM window_topn_rank_t +) WHERE 4 > rk; +---- +1 1 10 +10 3 200 +11 3 200 +12 3 200 +13 3 200 +2 1 20 +3 1 20 +4 1 20 +6 2 5 +7 2 15 +8 2 25 +9 3 100 + +# Test R6: RANK without PARTITION BY — should NOT trigger the optimization +# (global top-K with ties; SortExec with fetch handles this without our rule). +# Use window_topn_rank_t (still alive); window_topn_t was dropped earlier. +query II rowsort +SELECT id, val FROM ( + SELECT *, RANK() OVER (ORDER BY val) as rk FROM window_topn_rank_t +) WHERE rk <= 3; +---- +1 10 +6 5 +7 15 + +# Test R7: RANK with multi-column PARTITION BY +query III rowsort +SELECT id, pk, val FROM ( + SELECT *, RANK() OVER (PARTITION BY pk, id ORDER BY val) as rk FROM window_topn_rank_t +) WHERE rk <= 1; +---- +1 1 10 +10 3 200 +11 3 200 +12 3 200 +13 3 200 +14 3 300 +2 1 20 +3 1 20 +4 1 20 +5 1 30 +6 2 5 +7 2 15 +8 2 25 +9 3 100 + +# Test R8: Verify multi-column partition plan still uses fn=rank +query TT +EXPLAIN SELECT * FROM ( + SELECT *, RANK() OVER (PARTITION BY pk, id ORDER BY val) as rk FROM window_topn_rank_t +) WHERE rk <= 1; +---- +logical_plan +01)Projection: window_topn_rank_t.id, window_topn_rank_t.pk, window_topn_rank_t.val, rank() PARTITION BY [window_topn_rank_t.pk, window_topn_rank_t.id] ORDER BY [window_topn_rank_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rk +02)--Filter: rank() PARTITION BY [window_topn_rank_t.pk, window_topn_rank_t.id] ORDER BY [window_topn_rank_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW <= UInt64(1) +03)----WindowAggr: windowExpr=[[rank() PARTITION BY [window_topn_rank_t.pk, window_topn_rank_t.id] ORDER BY [window_topn_rank_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] +04)------TableScan: window_topn_rank_t projection=[id, pk, val] +physical_plan +01)ProjectionExec: expr=[id@0 as id, pk@1 as pk, val@2 as val, rank() PARTITION BY [window_topn_rank_t.pk, window_topn_rank_t.id] ORDER BY [window_topn_rank_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as rk] +02)--BoundedWindowAggExec: wdw=[rank() PARTITION BY [window_topn_rank_t.pk, window_topn_rank_t.id] ORDER BY [window_topn_rank_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "rank() PARTITION BY [window_topn_rank_t.pk, window_topn_rank_t.id] ORDER BY [window_topn_rank_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] +03)----PartitionedTopKExec: fn=rank, fetch=1, partition=[pk@1, id@0], order=[val@2 ASC NULLS LAST] +04)------DataSourceExec: partitions=1, partition_sizes=[1] + +# Test R9: RANK with DESC ordering +query III rowsort +SELECT id, pk, val FROM ( + SELECT *, RANK() OVER (PARTITION BY pk ORDER BY val DESC) as rk FROM window_topn_rank_t +) WHERE rk <= 1; +---- +14 3 300 +5 1 30 +8 2 25 + +# Test R10: Mixed window functions — RANK + ROW_NUMBER in the same query. +# Filter is on the RANK column; rule should still fire (matches by col_idx). +query III rowsort +SELECT id, pk, val FROM ( + SELECT *, + ROW_NUMBER() OVER (PARTITION BY pk ORDER BY val) as rn, + RANK() OVER (PARTITION BY pk ORDER BY val) as rk + FROM window_topn_rank_t +) WHERE rk <= 1; +---- +1 1 10 +6 2 5 +9 3 100 + +# Test R11: QUALIFY form (parser desugars to the same plan) +query IIII rowsort +SELECT id, pk, val, + RANK() OVER (PARTITION BY pk ORDER BY val) as rk +FROM window_topn_rank_t +QUALIFY rk <= 1; +---- +1 1 10 1 +6 2 5 1 +9 3 100 1 + +statement ok +DROP TABLE window_topn_rank_t; + +############################################################################### +# RANK() — equality predicate (negative: rule supports only =/>) +############################################################################### +# +# `extract_window_limit` matches only `<, <=, >, >=`. Equality predicates +# `rk = N` are NOT optimized by this rule (regardless of N). DuckDB +# special-cases `rk = 1` as equivalent to `rk <= 1`; we don't. The two +# tests below pin current behavior so that an accidental rule extension +# (or regression) shows up. + +statement ok +CREATE TABLE window_topn_rank_eq_t (id INT, pk INT, val INT) AS VALUES + (1, 1, 10), (2, 1, 20), (3, 1, 30), + (4, 2, 5), (5, 2, 15), (6, 2, 25); + +# Test R12: `rk = 1` — correct results, but plan should still contain +# FilterExec + SortExec (rule did NOT fire). +query III rowsort +SELECT id, pk, val FROM ( + SELECT *, RANK() OVER (PARTITION BY pk ORDER BY val) as rk FROM window_topn_rank_eq_t +) WHERE rk = 1; +---- +1 1 10 +4 2 5 + +query TT +EXPLAIN SELECT * FROM ( + SELECT *, RANK() OVER (PARTITION BY pk ORDER BY val) as rk FROM window_topn_rank_eq_t +) WHERE rk = 1; +---- +logical_plan +01)Projection: window_topn_rank_eq_t.id, window_topn_rank_eq_t.pk, window_topn_rank_eq_t.val, rank() PARTITION BY [window_topn_rank_eq_t.pk] ORDER BY [window_topn_rank_eq_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rk +02)--Filter: rank() PARTITION BY [window_topn_rank_eq_t.pk] ORDER BY [window_topn_rank_eq_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW = UInt64(1) +03)----WindowAggr: windowExpr=[[rank() PARTITION BY [window_topn_rank_eq_t.pk] ORDER BY [window_topn_rank_eq_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] +04)------TableScan: window_topn_rank_eq_t projection=[id, pk, val] +physical_plan +01)ProjectionExec: expr=[id@0 as id, pk@1 as pk, val@2 as val, rank() PARTITION BY [window_topn_rank_eq_t.pk] ORDER BY [window_topn_rank_eq_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as rk] +02)--FilterExec: rank() PARTITION BY [window_topn_rank_eq_t.pk] ORDER BY [window_topn_rank_eq_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 = 1 +03)----BoundedWindowAggExec: wdw=[rank() PARTITION BY [window_topn_rank_eq_t.pk] ORDER BY [window_topn_rank_eq_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "rank() PARTITION BY [window_topn_rank_eq_t.pk] ORDER BY [window_topn_rank_eq_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] +04)------SortExec: expr=[pk@1 ASC NULLS LAST, val@2 ASC NULLS LAST], preserve_partitioning=[false] +05)--------DataSourceExec: partitions=1, partition_sizes=[1] + +statement ok +DROP TABLE window_topn_rank_eq_t; + +############################################################################### +# RANK() — dense-ties boundary preservation +############################################################################### +# +# Heap fills with K=3 rows tied at the same value, then a strictly-better +# row arrives. The heap evicts one of the tied rows, but the new +# K-th-best is still tied with the evicted row (boundary unchanged). +# PartitionedTopKRank must push the evicted row into `ties` rather than +# discarding it. Without that branch, a `rk <= 3` query loses the +# evicted tied row. + +statement ok +CREATE TABLE window_topn_rank_dense_t (id INT, pk INT, val INT) AS VALUES + -- ten rows with the same val + one strictly-better row + (1, 1, 10), (2, 1, 10), (3, 1, 10), (4, 1, 10), (5, 1, 10), + (6, 1, 10), (7, 1, 10), (8, 1, 10), (9, 1, 10), (10, 1, 10), + (11, 1, 5); + +# Test R14: With `rk <= 3`, every row should be retained: +# - val=5 → rk=1 +# - val=10 (×10) → rk=2 each +# Total 11 rows. If the boundary-unchanged-eviction branch ever drops a +# tied row, this query would return fewer than 11. +query III rowsort +SELECT id, pk, val FROM ( + SELECT *, RANK() OVER (PARTITION BY pk ORDER BY val) as rk FROM window_topn_rank_dense_t +) WHERE rk <= 3; +---- +1 1 10 +10 1 10 +11 1 5 +2 1 10 +3 1 10 +4 1 10 +5 1 10 +6 1 10 +7 1 10 +8 1 10 +9 1 10 + +# Test R15: rule fired (no FilterExec/SortExec) +query TT +EXPLAIN SELECT * FROM ( + SELECT *, RANK() OVER (PARTITION BY pk ORDER BY val) as rk FROM window_topn_rank_dense_t +) WHERE rk <= 3; +---- +logical_plan +01)Projection: window_topn_rank_dense_t.id, window_topn_rank_dense_t.pk, window_topn_rank_dense_t.val, rank() PARTITION BY [window_topn_rank_dense_t.pk] ORDER BY [window_topn_rank_dense_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rk +02)--Filter: rank() PARTITION BY [window_topn_rank_dense_t.pk] ORDER BY [window_topn_rank_dense_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW <= UInt64(3) +03)----WindowAggr: windowExpr=[[rank() PARTITION BY [window_topn_rank_dense_t.pk] ORDER BY [window_topn_rank_dense_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] +04)------TableScan: window_topn_rank_dense_t projection=[id, pk, val] +physical_plan +01)ProjectionExec: expr=[id@0 as id, pk@1 as pk, val@2 as val, rank() PARTITION BY [window_topn_rank_dense_t.pk] ORDER BY [window_topn_rank_dense_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as rk] +02)--BoundedWindowAggExec: wdw=[rank() PARTITION BY [window_topn_rank_dense_t.pk] ORDER BY [window_topn_rank_dense_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "rank() PARTITION BY [window_topn_rank_dense_t.pk] ORDER BY [window_topn_rank_dense_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] +03)----PartitionedTopKExec: fn=rank, fetch=3, partition=[pk@1], order=[val@2 ASC NULLS LAST] +04)------DataSourceExec: partitions=1, partition_sizes=[1] + +statement ok +DROP TABLE window_topn_rank_dense_t; + +############################################################################### +# RANK() — NULL handling in ORDER BY +############################################################################### +# +# RANK ASSIGNMENTS WITH NULLS: +# ORDER BY val ASC NULLS LAST → non-NULLs ranked first, NULLs at the end +# ORDER BY val DESC NULLS LAST → same shape, different non-NULL order +# ORDER BY val ASC NULLS FIRST → NULLs all tie at rank 1 +# ORDER BY val DESC NULLS FIRST → NULLs all tie at rank 1 +# +# Multiple NULLs in the same partition all share the same rank (they're +# tied under the encoded ORDER BY). + +statement ok +CREATE TABLE window_topn_rank_null_t (id INT, pk INT, val INT) AS VALUES + -- pk=1: distinct vals plus one NULL → ASC NULLS LAST → 1,2,3,NULL ranks 1,2,3,4 + (1, 1, 1), (2, 1, 2), (3, 1, 3), (4, 1, NULL), + -- pk=2: one non-NULL plus two NULLs → ASC NULLS LAST → 5,NULL,NULL ranks 1,2,2 + (5, 2, 5), (6, 2, NULL), (7, 2, NULL); + +# Test R16: ASC NULLS LAST, rk <= 4 covers everything in pk=1, only rk≤2 +# in pk=2 (since both NULLs tie at rank 2 and there's no rank 3 or 4). +query III rowsort +SELECT id, pk, val FROM ( + SELECT *, RANK() OVER (PARTITION BY pk ORDER BY val ASC NULLS LAST) as rk FROM window_topn_rank_null_t +) WHERE rk <= 4; +---- +1 1 1 +2 1 2 +3 1 3 +4 1 NULL +5 2 5 +6 2 NULL +7 2 NULL + +# Test R17: ASC NULLS LAST, rk <= 2 — pk=1's NULL (rk=4) drops out; +# pk=2's NULLs (rk=2 each) are retained. +query III rowsort +SELECT id, pk, val FROM ( + SELECT *, RANK() OVER (PARTITION BY pk ORDER BY val ASC NULLS LAST) as rk FROM window_topn_rank_null_t +) WHERE rk <= 2; +---- +1 1 1 +2 1 2 +5 2 5 +6 2 NULL +7 2 NULL + +# Test R18: rule fires for NULLS LAST configuration +query TT +EXPLAIN SELECT * FROM ( + SELECT *, RANK() OVER (PARTITION BY pk ORDER BY val ASC NULLS LAST) as rk FROM window_topn_rank_null_t +) WHERE rk <= 2; +---- +logical_plan +01)Projection: window_topn_rank_null_t.id, window_topn_rank_null_t.pk, window_topn_rank_null_t.val, rank() PARTITION BY [window_topn_rank_null_t.pk] ORDER BY [window_topn_rank_null_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rk +02)--Filter: rank() PARTITION BY [window_topn_rank_null_t.pk] ORDER BY [window_topn_rank_null_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW <= UInt64(2) +03)----WindowAggr: windowExpr=[[rank() PARTITION BY [window_topn_rank_null_t.pk] ORDER BY [window_topn_rank_null_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] +04)------TableScan: window_topn_rank_null_t projection=[id, pk, val] +physical_plan +01)ProjectionExec: expr=[id@0 as id, pk@1 as pk, val@2 as val, rank() PARTITION BY [window_topn_rank_null_t.pk] ORDER BY [window_topn_rank_null_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as rk] +02)--BoundedWindowAggExec: wdw=[rank() PARTITION BY [window_topn_rank_null_t.pk] ORDER BY [window_topn_rank_null_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "rank() PARTITION BY [window_topn_rank_null_t.pk] ORDER BY [window_topn_rank_null_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] +03)----PartitionedTopKExec: fn=rank, fetch=2, partition=[pk@1], order=[val@2 ASC NULLS LAST] +04)------DataSourceExec: partitions=1, partition_sizes=[1] + +# Test R19: DESC NULLS LAST — pk=1: 3,2,1,NULL ranks 1,2,3,4; pk=2: 5,NULL,NULL ranks 1,2,2. +query III rowsort +SELECT id, pk, val FROM ( + SELECT *, RANK() OVER (PARTITION BY pk ORDER BY val DESC NULLS LAST) as rk FROM window_topn_rank_null_t +) WHERE rk <= 4; +---- +1 1 1 +2 1 2 +3 1 3 +4 1 NULL +5 2 5 +6 2 NULL +7 2 NULL + +# Test R20: ASC NULLS FIRST — pk=1: NULL,1,2,3 ranks 1,2,3,4; +# pk=2: NULL,NULL,5 ranks 1,1,3. With rk <= 2, pk=2's NULLs are kept, +# pk=1 keeps NULL and val=1. +query III rowsort +SELECT id, pk, val FROM ( + SELECT *, RANK() OVER (PARTITION BY pk ORDER BY val ASC NULLS FIRST) as rk FROM window_topn_rank_null_t +) WHERE rk <= 2; +---- +1 1 1 +4 1 NULL +6 2 NULL +7 2 NULL + +statement ok +DROP TABLE window_topn_rank_null_t; + # Reset config to default (false) statement ok SET datafusion.optimizer.enable_window_topn = false; From a274a1bf18494fc073fcbea7808fb0d9956e15b9 Mon Sep 17 00:00:00 2001 From: SubhamSinghal Date: Tue, 30 Jun 2026 22:23:47 +0530 Subject: [PATCH 2/3] Extend WindowTopN to support RANK --- benchmarks/queries/h2o/window.sql | 48 ++ .../tests/physical_optimizer/window_topn.rs | 192 ++++- .../physical-optimizer/src/window_topn.rs | 93 +- .../src/sorts/partitioned_topk.rs | 142 +++- datafusion/physical-plan/src/topk/mod.rs | 794 +++++++++++++++++- 5 files changed, 1197 insertions(+), 72 deletions(-) diff --git a/benchmarks/queries/h2o/window.sql b/benchmarks/queries/h2o/window.sql index 346a8e4713f83..37df0a28ae614 100644 --- a/benchmarks/queries/h2o/window.sql +++ b/benchmarks/queries/h2o/window.sql @@ -148,3 +148,51 @@ SELECT pk, largest2_v2 FROM ( ROW_NUMBER() OVER (PARTITION BY id3 % 100000 ORDER BY v2 DESC) AS order_v2 FROM large WHERE v2 IS NOT NULL ) sub_query WHERE order_v2 <= 2; + +-- Window Top-N (RANK top-2 per partition, ~100 partitions) +-- The RANK queries below mirror the ROW_NUMBER cardinality sweep +-- above and add heavy-ties variants. RANK semantics retain boundary +-- ties (`WHERE rk <= K` may keep more than K rows per partition), so +-- this exercises PartitionedTopKRank's ties-Vec path. +SELECT pk, largest_v2 FROM ( + SELECT (id3 % 100) AS pk, v2 AS largest_v2, + RANK() OVER (PARTITION BY (id3 % 100) ORDER BY v2 DESC) AS rk_v2 + FROM large WHERE v2 IS NOT NULL +) sub_query WHERE rk_v2 <= 2; + +-- Window Top-N (RANK top-2 per partition, ~1K partitions) +SELECT pkey, largest_v2 FROM ( + SELECT (id3 % 1000) AS pkey, v2 AS largest_v2, + RANK() OVER (PARTITION BY (id3 % 1000) ORDER BY v2 DESC) AS rk_v2 + FROM large WHERE v2 IS NOT NULL +) sub_query WHERE rk_v2 <= 2; + +-- Window Top-N (RANK top-2 per partition, ~1K partitions, heavy ties) +-- v2 % 10 forces 10 distinct OBY values, so most rows tie at the boundary +-- and exercise PartitionedTopKRank's ties-Vec path. +SELECT pkey, largest_v2 FROM ( + SELECT (id3 % 1000) AS pkey, v2 AS largest_v2, + RANK() OVER (PARTITION BY (id3 % 1000) ORDER BY (v2 % 10) DESC) AS rk_v2 + FROM large WHERE v2 IS NOT NULL +) sub_query WHERE rk_v2 <= 2; + +-- Window Top-N (RANK top-2 per partition, ~10K partitions, low ties) +SELECT id2, largest_v2 FROM ( + SELECT id2, v2 AS largest_v2, + RANK() OVER (PARTITION BY id2 ORDER BY v2 DESC) AS rk_v2 + FROM large WHERE v2 IS NOT NULL +) sub_query WHERE rk_v2 <= 2; + +-- Window Top-N (RANK top-2 per partition, ~10K partitions, heavy ties) +SELECT id2, largest_v2 FROM ( + SELECT id2, v2 AS largest_v2, + RANK() OVER (PARTITION BY id2 ORDER BY (v2 % 10) DESC) AS rk_v2 + FROM large WHERE v2 IS NOT NULL +) sub_query WHERE rk_v2 <= 2; + +-- Window Top-N (RANK top-2 per partition, ~100K partitions) +SELECT pk, largest_v2 FROM ( + SELECT (id3 % 100000) AS pk, v2 AS largest_v2, + RANK() OVER (PARTITION BY (id3 % 100000) ORDER BY v2 DESC) AS rk_v2 + FROM large WHERE v2 IS NOT NULL +) sub_query WHERE rk_v2 <= 2; diff --git a/datafusion/core/tests/physical_optimizer/window_topn.rs b/datafusion/core/tests/physical_optimizer/window_topn.rs index e3f73a85353cc..07a1db127ec54 100644 --- a/datafusion/core/tests/physical_optimizer/window_topn.rs +++ b/datafusion/core/tests/physical_optimizer/window_topn.rs @@ -25,6 +25,7 @@ use datafusion_common::ScalarValue; use datafusion_common::config::ConfigOptions; use datafusion_expr::Operator; use datafusion_expr::{WindowFrame, WindowFrameBound, WindowFrameUnits}; +use datafusion_functions_window::rank::{dense_rank_udwf, rank_udwf}; use datafusion_functions_window::row_number::row_number_udwf; use datafusion_physical_expr::expressions::{BinaryExpr, Column, col, lit}; use datafusion_physical_expr::window::StandardWindowExpr; @@ -226,7 +227,7 @@ fn basic_row_number_rn_lteq_3() -> Result<()> { let optimized = optimize(plan)?; assert_snapshot!(plan_str(optimized.as_ref()), @r#" BoundedWindowAggExec: wdw=[row_number: Field { "row_number": UInt64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] - PartitionedTopKExec: fetch=3, partition=[pk@0], order=[val@1 ASC] + PartitionedTopKExec: fn=row_number, fetch=3, partition=[pk@0], order=[val@1 ASC] PlaceholderRowExec "#); Ok(()) @@ -238,7 +239,7 @@ fn rn_lt_3_becomes_fetch_2() -> Result<()> { let optimized = optimize(plan)?; assert_snapshot!(plan_str(optimized.as_ref()), @r#" BoundedWindowAggExec: wdw=[row_number: Field { "row_number": UInt64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] - PartitionedTopKExec: fetch=2, partition=[pk@0], order=[val@1 ASC] + PartitionedTopKExec: fn=row_number, fetch=2, partition=[pk@0], order=[val@1 ASC] PlaceholderRowExec "#); Ok(()) @@ -300,7 +301,7 @@ fn flipped_3_gteq_rn() -> Result<()> { let optimized = optimize(plan)?; assert_snapshot!(plan_str(optimized.as_ref()), @r#" BoundedWindowAggExec: wdw=[row_number: Field { "row_number": UInt64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] - PartitionedTopKExec: fetch=3, partition=[pk@0], order=[val@1 ASC] + PartitionedTopKExec: fn=row_number, fetch=3, partition=[pk@0], order=[val@1 ASC] PlaceholderRowExec "#); Ok(()) @@ -418,8 +419,191 @@ fn with_projection_between() -> Result<()> { assert_snapshot!(plan_str(optimized.as_ref()), @r#" ProjectionExec: expr=[pk@0 as pk, val@1 as val, row_number@2 as row_number] BoundedWindowAggExec: wdw=[row_number: Field { "row_number": UInt64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] - PartitionedTopKExec: fetch=3, partition=[pk@0], order=[val@1 ASC] + PartitionedTopKExec: fn=row_number, fetch=3, partition=[pk@0], order=[val@1 ASC] PlaceholderRowExec "#); Ok(()) } + +// ---------------------------------------------------------------------- +// RANK rule tests +// ---------------------------------------------------------------------- + +/// Build: FilterExec(rk op limit) → BoundedWindowAggExec( PBY pk OBY val) → SortExec(pk, val) +/// +/// `udwf_factory` selects the window UDWF (rank, dense_rank, ...) and +/// `udwf_name` is the column name produced by that UDWF (matters because +/// the rule resolves the filter column by index, but the snapshot prints +/// the name). +fn build_ranking_topn_plan( + udwf_factory: fn() -> Arc, + udwf_name: &str, + limit_value: i64, + op: Operator, +) -> Result> { + let s = schema(); + let input: Arc = Arc::new(PlaceholderRowExec::new(Arc::clone(&s))); + + let ordering = LexOrdering::new(vec![ + PhysicalSortExpr::new_default(col("pk", &s)?).asc(), + PhysicalSortExpr::new_default(col("val", &s)?).asc(), + ]) + .unwrap(); + + let sort: Arc = + Arc::new(SortExec::new(ordering.clone(), input).with_preserve_partitioning(true)); + + let partition_by = vec![col("pk", &s)?]; + let order_by = vec![PhysicalSortExpr::new_default(col("val", &s)?).asc()]; + + let window_expr = Arc::new(StandardWindowExpr::new( + create_udwf_window_expr(&udwf_factory(), &[], &s, udwf_name.to_string(), false)?, + &partition_by, + &order_by, + Arc::new(WindowFrame::new_bounds( + WindowFrameUnits::Rows, + WindowFrameBound::Preceding(ScalarValue::UInt64(None)), + WindowFrameBound::CurrentRow, + )), + )); + + let window: Arc = Arc::new(BoundedWindowAggExec::try_new( + vec![window_expr], + sort, + InputOrderMode::Sorted, + true, + )?); + + let rk_col = Arc::new(Column::new(udwf_name, 2)); + let limit_lit = lit(ScalarValue::UInt64(Some(limit_value as u64))); + // Place column on whichever side matches the operator's expectation. + let predicate: Arc = match op { + Operator::LtEq | Operator::Lt => Arc::new(BinaryExpr::new(rk_col, op, limit_lit)), + Operator::GtEq | Operator::Gt => Arc::new(BinaryExpr::new(limit_lit, op, rk_col)), + _ => unreachable!("only =/> are supported by the rule"), + }; + let filter: Arc = + Arc::new(FilterExec::try_new(predicate, window)?); + + Ok(filter) +} + +/// Build a RANK plan with NO ORDER BY: every row ties at rank 1 — degenerate. +fn build_rank_no_order_by_plan(limit_value: i64) -> Result> { + let s = schema(); + let input: Arc = Arc::new(PlaceholderRowExec::new(Arc::clone(&s))); + + let ordering = + LexOrdering::new(vec![PhysicalSortExpr::new_default(col("pk", &s)?).asc()]) + .unwrap(); + + let sort: Arc = + Arc::new(SortExec::new(ordering.clone(), input).with_preserve_partitioning(true)); + + let partition_by = vec![col("pk", &s)?]; + + let window_expr = Arc::new(StandardWindowExpr::new( + create_udwf_window_expr(&rank_udwf(), &[], &s, "rank".to_string(), false)?, + &partition_by, + &[], // empty ORDER BY + Arc::new(WindowFrame::new_bounds( + WindowFrameUnits::Rows, + WindowFrameBound::Preceding(ScalarValue::UInt64(None)), + WindowFrameBound::CurrentRow, + )), + )); + + let window: Arc = Arc::new(BoundedWindowAggExec::try_new( + vec![window_expr], + sort, + InputOrderMode::Sorted, + true, + )?); + + let rk_col = Arc::new(Column::new("rank", 2)); + let limit_lit = lit(ScalarValue::UInt64(Some(limit_value as u64))); + let predicate = Arc::new(BinaryExpr::new(rk_col, Operator::LtEq, limit_lit)); + let filter: Arc = + Arc::new(FilterExec::try_new(predicate, window)?); + + Ok(filter) +} + +#[test] +fn basic_rank_rk_lteq_3() -> Result<()> { + let plan = build_ranking_topn_plan(rank_udwf, "rank", 3, Operator::LtEq)?; + let optimized = optimize(plan)?; + assert_snapshot!(plan_str(optimized.as_ref()), @r#" + BoundedWindowAggExec: wdw=[rank: Field { "rank": UInt64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] + PartitionedTopKExec: fn=rank, fetch=3, partition=[pk@0], order=[val@1 ASC] + PlaceholderRowExec + "#); + Ok(()) +} + +#[test] +fn rank_rk_lt_4_becomes_fetch_3() -> Result<()> { + let plan = build_ranking_topn_plan(rank_udwf, "rank", 4, Operator::Lt)?; + let optimized = optimize(plan)?; + assert_snapshot!(plan_str(optimized.as_ref()), @r#" + BoundedWindowAggExec: wdw=[rank: Field { "rank": UInt64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] + PartitionedTopKExec: fn=rank, fetch=3, partition=[pk@0], order=[val@1 ASC] + PlaceholderRowExec + "#); + Ok(()) +} + +#[test] +fn rank_flipped_3_gteq_rk() -> Result<()> { + let plan = build_ranking_topn_plan(rank_udwf, "rank", 3, Operator::GtEq)?; + let optimized = optimize(plan)?; + assert_snapshot!(plan_str(optimized.as_ref()), @r#" + BoundedWindowAggExec: wdw=[rank: Field { "rank": UInt64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] + PartitionedTopKExec: fn=rank, fetch=3, partition=[pk@0], order=[val@1 ASC] + PlaceholderRowExec + "#); + Ok(()) +} + +#[test] +fn rank_flipped_4_gt_rk_becomes_fetch_3() -> Result<()> { + let plan = build_ranking_topn_plan(rank_udwf, "rank", 4, Operator::Gt)?; + let optimized = optimize(plan)?; + assert_snapshot!(plan_str(optimized.as_ref()), @r#" + BoundedWindowAggExec: wdw=[rank: Field { "rank": UInt64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] + PartitionedTopKExec: fn=rank, fetch=3, partition=[pk@0], order=[val@1 ASC] + PlaceholderRowExec + "#); + Ok(()) +} + +#[test] +fn rank_no_order_by_no_change() -> Result<()> { + // Without ORDER BY, every row ties at rank 1 — the optimization is + // degenerate (entire input would be retained, ties storage unbounded). + // The rule must skip. + let plan = build_rank_no_order_by_plan(3)?; + let before = plan_str(plan.as_ref()); + let optimized = optimize(plan)?; + let after = plan_str(optimized.as_ref()); + assert_eq!( + before, after, + "RANK with empty ORDER BY must not be rewritten" + ); + Ok(()) +} + +#[test] +fn dense_rank_no_change() -> Result<()> { + // DENSE_RANK is not yet supported by the rule. The plan must pass + // through unchanged. + let plan = build_ranking_topn_plan(dense_rank_udwf, "dense_rank", 3, Operator::LtEq)?; + let before = plan_str(plan.as_ref()); + let optimized = optimize(plan)?; + let after = plan_str(optimized.as_ref()); + assert_eq!( + before, after, + "DENSE_RANK is unsupported and must not be rewritten" + ); + Ok(()) +} diff --git a/datafusion/physical-optimizer/src/window_topn.rs b/datafusion/physical-optimizer/src/window_topn.rs index 40dbddfbdf9fb..3f88e86c67324 100644 --- a/datafusion/physical-optimizer/src/window_topn.rs +++ b/datafusion/physical-optimizer/src/window_topn.rs @@ -26,12 +26,27 @@ //! ) WHERE rn <= K; //! ``` //! +//! or with `RANK()` in place of `ROW_NUMBER()`: +//! +//! ```sql +//! SELECT * FROM ( +//! SELECT *, RANK() OVER (PARTITION BY pk ORDER BY val) as rk +//! FROM t +//! ) WHERE rk <= K; +//! ``` +//! //! And replaces the `FilterExec → BoundedWindowAggExec → SortExec` pipeline //! with `BoundedWindowAggExec → PartitionedTopKExec(fetch=K)`, removing both //! the `FilterExec` and `SortExec`. //! -//! See [`PartitionedTopKExec`] -//! for details on the replacement operator. +//! The appropriate [`WindowFnKind`] is forwarded to `PartitionedTopKExec`. +//! RANK requires a non-empty `ORDER BY` clause (otherwise all rows tie at +//! rank 1 and the optimization is degenerate). +//! +//! See [`PartitionedTopKExec`] for details on the replacement operator. +//! +//! [`PartitionedTopKExec`]: datafusion_physical_plan::sorts::partitioned_topk::PartitionedTopKExec +//! [`WindowFnKind`]: datafusion_physical_plan::sorts::partitioned_topk::WindowFnKind use std::sync::Arc; @@ -46,19 +61,22 @@ use datafusion_physical_expr::window::StandardWindowExpr; use datafusion_physical_plan::ExecutionPlan; use datafusion_physical_plan::filter::FilterExec; use datafusion_physical_plan::projection::ProjectionExec; -use datafusion_physical_plan::sorts::partitioned_topk::PartitionedTopKExec; +use datafusion_physical_plan::sorts::partitioned_topk::{ + PartitionedTopKExec, WindowFnKind, +}; use datafusion_physical_plan::sorts::sort::SortExec; use datafusion_physical_plan::windows::{BoundedWindowAggExec, WindowUDFExpr}; -/// Physical optimizer rule that converts per-partition `ROW_NUMBER` top-K -/// queries into a more efficient plan using [`PartitionedTopKExec`]. +/// Physical optimizer rule that converts per-partition `ROW_NUMBER` and +/// `RANK` top-K queries into a more efficient plan using +/// [`PartitionedTopKExec`]. /// /// # Pattern Detected /// /// ```text -/// FilterExec(rn <= K) +/// FilterExec( <= K) /// [optional ProjectionExec] -/// BoundedWindowAggExec(ROW_NUMBER PARTITION BY ... ORDER BY ...) +/// BoundedWindowAggExec( PARTITION BY ... ORDER BY ...) /// SortExec(partition_keys, order_keys) /// ``` /// @@ -66,13 +84,13 @@ use datafusion_physical_plan::windows::{BoundedWindowAggExec, WindowUDFExpr}; /// /// ```text /// [optional ProjectionExec] -/// BoundedWindowAggExec(ROW_NUMBER PARTITION BY ... ORDER BY ...) -/// PartitionedTopKExec(partition_keys, order_keys, fetch=K) +/// BoundedWindowAggExec( PARTITION BY ... ORDER BY ...) +/// PartitionedTopKExec(fn=, partition_keys, order_keys, fetch=K) /// ``` /// -/// The `FilterExec` is removed entirely (all output rows have `rn ∈ {1..K}`). -/// The `SortExec` is replaced by `PartitionedTopKExec` which maintains a -/// per-partition top-K heap instead of sorting the entire dataset. +/// The `FilterExec` is removed entirely. The `SortExec` is replaced by +/// `PartitionedTopKExec`, which maintains a per-partition top-K heap (and, +/// for `RANK`, a sibling ties `Vec`) instead of sorting the whole dataset. /// /// # Supported Predicates /// @@ -86,9 +104,12 @@ use datafusion_physical_plan::windows::{BoundedWindowAggExec, WindowUDFExpr}; /// All of the following must be true: /// - Config flag `enable_window_topn` is `true` /// - The plan matches `FilterExec → [ProjectionExec] → BoundedWindowAggExec → SortExec` -/// - The window function is `ROW_NUMBER` (not `RANK`, `DENSE_RANK`, etc.) -/// - `ROW_NUMBER` has a `PARTITION BY` clause (global top-K is already -/// handled by `SortExec` with `fetch`) +/// - The window function is `ROW_NUMBER` or `RANK` (not `DENSE_RANK`) +/// - The window function has a `PARTITION BY` clause (global top-K is +/// already handled by `SortExec` with `fetch`) +/// - For `RANK`: a non-empty `ORDER BY` clause (otherwise all rows tie +/// at rank 1 — the optimization is useless and the boundary-tie storage +/// would be unbounded) /// - The filter predicate compares the window output column to an integer /// literal using `<=`, `<`, `>=`, or `>` /// @@ -123,7 +144,7 @@ impl WindowTopN { let child = filter.input(); let (window_exec, proj_between) = find_window_below(child)?; - // Step 4: Verify col_idx references a ROW_NUMBER window output column + // Step 4: Verify col_idx references a supported window function output column let input_field_count = window_exec.input().schema().fields().len(); if col_idx < input_field_count { return None; // Filter is on an input column, not a window column @@ -133,9 +154,7 @@ impl WindowTopN { if window_expr_idx >= window_exprs.len() { return None; } - if !is_row_number(&window_exprs[window_expr_idx]) { - return None; - } + let fn_kind = supported_window_fn(&window_exprs[window_expr_idx])?; // Step 5: Verify child of window is SortExec let sort_exec = window_exec.input().downcast_ref::()?; @@ -151,12 +170,22 @@ impl WindowTopN { return None; } + // For RANK: an empty ORDER BY makes every row tie at rank 1 — + // the optimization is degenerate (we'd retain the entire input) + // and tie storage would be unbounded. + if matches!(fn_kind, WindowFnKind::Rank) + && window_exprs[window_expr_idx].order_by().is_empty() + { + return None; + } + // Step 7: Build PartitionedTopKExec using SortExec's expressions let partitioned_topk = PartitionedTopKExec::try_new( Arc::clone(sort_child), sort_exec.expr().clone(), partition_prefix_len, limit_n, + fn_kind, ) .ok()?; @@ -287,20 +316,24 @@ fn scalar_to_usize(value: &ScalarValue) -> Option { } } -/// Check if a window expression is `ROW_NUMBER`. +/// Identify which supported ranking window function `expr` is. /// /// Downcasts through `StandardWindowExpr` → `WindowUDFExpr` and checks -/// that the UDF name is `"row_number"`. Returns `false` for all other -/// window functions (e.g., `RANK`, `DENSE_RANK`, `SUM`). -fn is_row_number(expr: &Arc) -> bool { - let Some(swe) = expr.as_any().downcast_ref::() else { - return false; - }; +/// the UDF name. Returns: +/// - `Some(WindowFnKind::RowNumber)` for `"row_number"` +/// - `Some(WindowFnKind::Rank)` for `"rank"` +/// - `None` for everything else (e.g. `dense_rank`) +fn supported_window_fn( + expr: &Arc, +) -> Option { + let swe = expr.as_any().downcast_ref::()?; let swfe = swe.get_standard_func_expr(); - let Some(udf) = swfe.as_any().downcast_ref::() else { - return false; - }; - udf.fun().name() == "row_number" + let udf = swfe.as_any().downcast_ref::()?; + match udf.fun().name() { + "row_number" => Some(WindowFnKind::RowNumber), + "rank" => Some(WindowFnKind::Rank), + _ => None, + } } /// Walk below a plan node looking for a [`BoundedWindowAggExec`]. diff --git a/datafusion/physical-plan/src/sorts/partitioned_topk.rs b/datafusion/physical-plan/src/sorts/partitioned_topk.rs index 1040abcb75ea9..4491b9a161849 100644 --- a/datafusion/physical-plan/src/sorts/partitioned_topk.rs +++ b/datafusion/physical-plan/src/sorts/partitioned_topk.rs @@ -23,10 +23,13 @@ //! FROM t WHERE rn <= N //! ``` //! -//! Instead of sorting the entire dataset, this operator maintains a -//! [`TopK`](crate::topk::TopK) heap per partition (reusing the existing TopK implementation) -//! and emits only the top-K rows per partition in sorted order -//! `(partition_keys, order_keys)`. +//! Instead of sorting the entire dataset, this operator delegates to +//! [`PartitionedTopK`] (for `ROW_NUMBER`) or [`PartitionedTopKRank`] +//! (for `RANK`), both of which maintain one heap per distinct partition +//! key while sharing a single [`arrow::row::RowConverter`], +//! [`MemoryReservation`](datafusion_execution::memory_pool::MemoryReservation), +//! and metrics set across all partitions, and emit only the top-K rows +//! per partition in sorted order `(partition_keys, order_keys)`. use std::fmt::{self, Formatter}; use std::sync::Arc; @@ -43,12 +46,27 @@ use futures::TryStreamExt; use crate::execution_plan::{Boundedness, EmissionType}; use crate::metrics::ExecutionPlanMetricsSet; -use crate::topk::{PartitionedTopK, build_sort_fields}; +use crate::topk::{PartitionedTopK, PartitionedTopKRank, build_sort_fields}; use crate::{ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties, PlanProperties, SendableRecordBatchStream, stream::RecordBatchStreamAdapter, }; +/// Which window function `PartitionedTopKExec` is optimizing. +/// +/// Different ranking functions have different per-partition retention rules: +/// - [`RowNumber`](Self::RowNumber): exactly K rows per partition. +/// - [`Rank`](Self::Rank): K rows plus any rows tied at the boundary +/// ORDER BY value (RANK semantics — `WHERE rk <= K` may keep more +/// than K rows when ties straddle the boundary). +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum WindowFnKind { + /// `ROW_NUMBER()` — keep exactly K rows per partition. + RowNumber, + /// `RANK()` — keep K rows plus any rows tied at the boundary. + Rank, +} + /// Per-partition Top-K operator for window function queries. /// /// # Background @@ -89,9 +107,14 @@ use crate::{ /// DataSourceExec /// ``` /// -/// Instead of sorting the entire dataset, this operator reads unsorted input, -/// maintains a [`TopK`](crate::topk::TopK) heap per distinct partition key, and emits only the -/// top-K rows per partition in sorted order `(partition_keys, order_keys)`. +/// Instead of sorting the entire dataset, this operator reads unsorted input +/// and delegates to [`PartitionedTopK`] (for `ROW_NUMBER`) or +/// [`PartitionedTopKRank`] (for `RANK`), each maintaining one heap per +/// distinct partition key while sharing a single +/// [`arrow::row::RowConverter`] / +/// [`MemoryReservation`](datafusion_execution::memory_pool::MemoryReservation) +/// across all partitions, and emits only the top-K rows per partition in +/// sorted order `(partition_keys, order_keys)`. /// /// Cost: O(N log K) time instead of O(N log N), and O(K × P × row_size) /// memory where K = fetch, P = number of distinct partitions. @@ -164,6 +187,9 @@ pub struct PartitionedTopKExec { /// Derived from the filter predicate: `rn <= 3` → `fetch = 3`, /// `rn < 3` → `fetch = 2`. fetch: usize, + /// Which window function this operator is optimizing. Selects the + /// per-partition retention policy (see [`WindowFnKind`]). + fn_kind: WindowFnKind, /// Execution metrics metrics_set: ExecutionPlanMetricsSet, /// Cached plan properties (output ordering, partitioning, etc.) @@ -181,6 +207,8 @@ impl PartitionedTopKExec { /// * `partition_prefix_len` - Number of leading expressions in `expr` /// that form the partition key. Must be >= 1. /// * `fetch` - Maximum rows to retain per partition (the K in "top-K"). + /// * `fn_kind` - Which ranking window function this operator optimizes + /// ([`WindowFnKind::RowNumber`] or [`WindowFnKind::Rank`]). /// /// # Example /// @@ -191,6 +219,7 @@ impl PartitionedTopKExec { /// LexOrdering([store ASC, revenue DESC]), /// 1, // partition_prefix_len: 1 partition column (store) /// 5, // fetch: keep top 5 per partition + /// WindowFnKind::RowNumber, /// ) /// ``` pub fn try_new( @@ -198,6 +227,7 @@ impl PartitionedTopKExec { expr: LexOrdering, partition_prefix_len: usize, fetch: usize, + fn_kind: WindowFnKind, ) -> Result { let cache = Self::compute_properties(&input, expr.clone())?; Ok(Self { @@ -205,6 +235,7 @@ impl PartitionedTopKExec { expr, partition_prefix_len, fetch, + fn_kind, metrics_set: ExecutionPlanMetricsSet::new(), cache: Arc::new(cache), }) @@ -231,6 +262,11 @@ impl PartitionedTopKExec { self.fetch } + /// Returns which window function this operator is optimizing. + pub fn fn_kind(&self) -> WindowFnKind { + self.fn_kind + } + /// Compute [`PlanProperties`] for this operator. /// /// The output is sorted by `sort_exprs` (partition keys then order keys), @@ -254,6 +290,10 @@ impl PartitionedTopKExec { impl DisplayAs for PartitionedTopKExec { fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result { + let fn_label = match self.fn_kind { + WindowFnKind::RowNumber => "row_number", + WindowFnKind::Rank => "rank", + }; match t { DisplayFormatType::Default | DisplayFormatType::Verbose => { let partition_exprs: Vec = self.expr[..self.partition_prefix_len] @@ -266,7 +306,8 @@ impl DisplayAs for PartitionedTopKExec { .collect(); write!( f, - "PartitionedTopKExec: fetch={}, partition=[{}], order=[{}]", + "PartitionedTopKExec: fn={}, fetch={}, partition=[{}], order=[{}]", + fn_label, self.fetch, partition_exprs.join(", "), order_exprs.join(", "), @@ -281,6 +322,7 @@ impl DisplayAs for PartitionedTopKExec { .iter() .map(|e| format!("{e}")) .collect(); + writeln!(f, "fn={fn_label}")?; writeln!(f, "fetch={}", self.fetch)?; writeln!(f, "partition=[{}]", partition_exprs.join(", "))?; writeln!(f, "order=[{}]", order_exprs.join(", ")) @@ -325,6 +367,7 @@ impl ExecutionPlan for PartitionedTopKExec { self.expr.clone(), self.partition_prefix_len, self.fetch, + self.fn_kind, )?)) } @@ -348,6 +391,7 @@ impl ExecutionPlan for PartitionedTopKExec { LexOrdering::new(self.expr[self.partition_prefix_len..].iter().cloned()) .expect("PartitionedTopKExec requires at least one order-by expression"); let fetch = self.fetch; + let fn_kind = self.fn_kind; let batch_size = context.session_config().batch_size(); let runtime = Arc::clone(&context.runtime_env()); let metrics_set = self.metrics_set.clone(); @@ -361,6 +405,7 @@ impl ExecutionPlan for PartitionedTopKExec { partition_sort_fields, order_expr, fetch, + fn_kind, batch_size, runtime, metrics_set, @@ -376,25 +421,29 @@ impl ExecutionPlan for PartitionedTopKExec { } } -/// Read all input, feed each batch into a [`PartitionedTopK`] (which -/// maintains one heap per distinct partition key), then emit results -/// ordered by `(partition_keys, order_keys)`. +/// Read all input, feed each batch into a per-partition top-K state +/// (either [`PartitionedTopK`] for `ROW_NUMBER` or +/// [`PartitionedTopKRank`] for `RANK`), then emit results ordered by +/// `(partition_keys, order_keys)`. /// /// # Phases /// -/// 1. **Accumulation** — forward each input `RecordBatch` to -/// [`PartitionedTopK::insert_batch`], which demultiplexes rows by -/// partition key and dispatches them into the per-key heap. The -/// `RowConverter` and `MemoryReservation` are shared across all -/// partitions for this operator instance. +/// 1. **Accumulation** — forward each input `RecordBatch` to the +/// per-partition state's `insert_batch`. The `RowConverter` for +/// ORDER BY columns, the operator's `MemoryReservation`, and the +/// `TopKMetrics` are shared across all distinct partition keys for +/// this operator instance. /// -/// 2. **Emission** — [`PartitionedTopK::emit`] drains all heaps in -/// sorted partition-key order, returning a coalesced batch stream. +/// 2. **Emission** — `emit` drains all per-partition heaps in sorted +/// partition-key order, returning a coalesced batch stream. For +/// `RANK`, boundary-tied rows are materialized and emitted after +/// each partition's heap rows. /// /// # Cost /// /// - Time: O(N log K) where N = total rows, K = fetch /// - Memory: O(K × P × row_size) where P = number of distinct partitions +/// plus, for RANK, the boundary ties' rows #[expect(clippy::too_many_arguments)] async fn do_partitioned_topk( partition_id: usize, @@ -404,26 +453,47 @@ async fn do_partitioned_topk( partition_sort_fields: Vec, order_expr: LexOrdering, fetch: usize, + fn_kind: WindowFnKind, batch_size: usize, runtime: Arc, metrics_set: ExecutionPlanMetricsSet, ) -> Result { - let mut state = PartitionedTopK::try_new( - partition_id, - schema, - partition_exprs, - partition_sort_fields, - order_expr, - fetch, - batch_size, - &runtime, - &metrics_set, - )?; - - while let Some(batch) = input.next().await { - state.insert_batch(&batch?)?; + match fn_kind { + WindowFnKind::RowNumber => { + let mut state = PartitionedTopK::try_new( + partition_id, + schema, + partition_exprs, + partition_sort_fields, + order_expr, + fetch, + batch_size, + &runtime, + &metrics_set, + )?; + while let Some(batch) = input.next().await { + state.insert_batch(&batch?)?; + } + drop(input); + state.emit() + } + WindowFnKind::Rank => { + let mut state = PartitionedTopKRank::try_new( + partition_id, + schema, + partition_exprs, + partition_sort_fields, + order_expr, + fetch, + batch_size, + &runtime, + &metrics_set, + )?; + while let Some(batch) = input.next().await { + state.insert_batch(&batch?)?; + } + drop(input); + state.emit() + } } - drop(input); - - state.emit() } diff --git a/datafusion/physical-plan/src/topk/mod.rs b/datafusion/physical-plan/src/topk/mod.rs index ee8675d7183b1..1e3efff36b1d8 100644 --- a/datafusion/physical-plan/src/topk/mod.rs +++ b/datafusion/physical-plan/src/topk/mod.rs @@ -307,6 +307,24 @@ impl TopKDynamicFilters { // Guesstimate for memory allocation: estimated number of bytes used per row in the RowConverter const ESTIMATED_BYTES_PER_ROW: usize = 20; +/// Owned data of a row that was just evicted from a [`TopKHeap`]. +/// +/// Returned by [`TopKHeap::add`] so that callers (e.g. rank-aware +/// wrappers that retain boundary ties) can decide whether to retain +/// the evicted row externally. The underlying batch is captured +/// before the heap's internal `RecordBatchStore` decrements the +/// batch's use count, so the data remains accessible even if the +/// heap drops its internal reference to the batch. +#[derive(Debug, Clone)] +pub(crate) struct EvictedRow { + /// The record batch the evicted row came from. + pub batch: RecordBatch, + /// Row index within `batch`. + pub index: usize, + /// Encoded ORDER BY tuple for the evicted row, in [`arrow::row`] format. + pub row_bytes: Vec, +} + pub(crate) fn build_sort_fields( ordering: &[PhysicalSortExpr], schema: &SchemaRef, @@ -895,12 +913,16 @@ impl TopKHeap { /// Adds `row` to this heap. If inserting this new item would /// increase the size past `k`, removes the previously smallest /// item. + /// + /// Returns `Some(EvictedRow)` if an existing row was evicted to + /// make room for `row`, or `None` if the row was inserted into a + /// non-full heap. fn add( &mut self, batch_entry: &mut RecordBatchEntry, row: impl AsRef<[u8]>, index: usize, - ) { + ) -> Option { let batch_id = batch_entry.id; batch_entry.uses += 1; @@ -911,6 +933,26 @@ impl TopKHeap { if self.inner.len() == self.k { let mut prev_min = self.inner.peek_mut().unwrap(); + // Capture evicted row data before `unuse` (which may GC the + // batch from the store) and `replace_with` (which overwrites + // `prev_min` in place). The batch comes from `self.store` for + // cross-batch evictions, or directly from `batch_entry` when + // a row evicts another row from the same in-flight batch + // (entry not yet registered in the store). + let evicted_batch = if prev_min.batch_id == batch_entry.id { + batch_entry.batch.clone() + } else { + self.store + .get(prev_min.batch_id) + .map(|entry| entry.batch.clone()) + .expect("evicted row's batch must be present in the store") + }; + let evicted = EvictedRow { + batch: evicted_batch, + index: prev_min.index, + row_bytes: prev_min.row.clone(), + }; + // Update batch use if prev_min.batch_id == batch_entry.id { batch_entry.uses -= 1; @@ -924,12 +966,15 @@ impl TopKHeap { prev_min.replace_with(row, batch_id, index); self.owned_bytes += prev_min.owned_size(); + + Some(evicted) } else { let new_row = TopKRow::new(row, batch_id, index); self.owned_bytes += new_row.owned_size(); // put the new row into the heap self.inner.push(new_row); - }; + None + } } /// Returns the values stored in this heap, from values low to @@ -1414,6 +1459,358 @@ impl PartitionedTopK { } } +/// A run of rows from a single source [`RecordBatch`] that tied at the +/// boundary when inserted. Stored as `(batch, indices)` and materialized +/// at emit time via [`take_record_batch`]. +#[derive(Debug)] +struct TieEntry { + batch: RecordBatch, + /// Indices into `batch` of the rows tied at the (then-current) + /// boundary. Always non-empty by construction. + row_indices: Vec, + /// `get_record_batch_memory_size(&batch)` captured at push time so + /// `RankPartitionState::size()` doesn't recurse through `batch`'s + /// columns on every `try_resize` call. + batch_bytes: usize, +} + +/// Per-partition state for `RANK()` semantics. +/// +/// Composes [`TopKHeap`] as the K-bounded core plus a sibling +/// `Vec` for boundary-tied rows. `RANK ≤ K` keeps the K +/// best rows by ORDER BY plus every row tied at the K-th-best +/// ORDER BY value — the boundary. So the total retained rows can +/// exceed K when ties straddle the boundary. +struct RankPartitionState { + heap: TopKHeap, + ties: Vec, +} + +impl RankPartitionState { + fn size(&self) -> usize { + let ties_buffer = self.ties.capacity() * size_of::(); + let ties_contents: usize = self + .ties + .iter() + .map(|t| t.row_indices.capacity() * size_of::() + t.batch_bytes) + .sum(); + self.heap.size() + ties_buffer + ties_contents + } +} + +/// Sibling to [`PartitionedTopK`] implementing `RANK()` semantics. +/// +/// Per partition, retains the K-best rows plus every row tied at the +/// K-th-best ORDER BY value (so `WHERE rk <= K` may keep more than K +/// rows when ties straddle the boundary). Like [`PartitionedTopK`], +/// the [`RowConverter`], [`MemoryReservation`], scratch [`Rows`] +/// buffer, and [`TopKMetrics`] are shared across all partitions for +/// this operator instance. +/// +/// # Algorithm (per row) +/// +/// For each incoming row, compare its encoded ORDER BY bytes against +/// `heap.max()` — the K-th-best row, which is by definition the +/// admission boundary. `heap.max()` is `None` until the heap fills +/// to K rows: +/// +/// - heap not full (`max() == None`) → forward to the heap +/// - row's ob `==` max → push to ties (no heap call) +/// - row's ob `>` max → drop +/// - row's ob `<` max → forward to heap; on eviction, compare the +/// new `heap.max()` to the evicted row's bytes: if equal, push +/// evicted to ties (still tied at the new boundary's rank); else +/// clear ties (boundary moved up, old ties no longer satisfy +/// `rk ≤ K`) +pub(crate) struct PartitionedTopKRank { + schema: SchemaRef, + metrics: TopKMetrics, + reservation: MemoryReservation, + /// ORDER BY expressions (excludes PARTITION BY). + expr: LexOrdering, + /// Encoder for ORDER BY columns. Reused across partitions. + row_converter: RowConverter, + /// Scratch row buffer reused across `insert_batch` calls. + scratch_rows: Rows, + /// PARTITION BY expressions. + partition_exprs: Vec>, + /// Encoder for the partition key. + partition_converter: RowConverter, + /// Scratch row buffer for partition-key encoding. Reused across + /// `insert_batch` calls (cleared + appended each batch) so we + /// avoid allocating a fresh `Rows` buffer every batch. + partition_scratch_rows: Rows, + /// One rank state per distinct partition key seen so far. + states: HashMap, + k: usize, + batch_size: usize, +} + +impl PartitionedTopKRank { + #[expect(clippy::too_many_arguments)] + pub(crate) fn try_new( + partition_id: usize, + schema: SchemaRef, + partition_exprs: Vec>, + partition_sort_fields: Vec, + order_expr: LexOrdering, + k: usize, + batch_size: usize, + runtime: &Arc, + metrics: &ExecutionPlanMetricsSet, + ) -> Result { + assert!(k > 0, "PartitionedTopKRank requires k > 0"); + let reservation = + MemoryConsumer::new(format!("PartitionedTopKRank[{partition_id}]")) + .register(&runtime.memory_pool); + + let order_sort_fields = build_sort_fields(&order_expr, &schema)?; + let row_converter = RowConverter::new(order_sort_fields)?; + let scratch_rows = + row_converter.empty_rows(batch_size, ESTIMATED_BYTES_PER_ROW * batch_size); + + let partition_converter = RowConverter::new(partition_sort_fields)?; + let partition_scratch_rows = partition_converter + .empty_rows(batch_size, ESTIMATED_BYTES_PER_ROW * batch_size); + + Ok(Self { + schema, + metrics: TopKMetrics::new(metrics, partition_id), + reservation, + expr: order_expr, + row_converter, + scratch_rows, + partition_exprs, + partition_converter, + partition_scratch_rows, + states: HashMap::new(), + k, + batch_size, + }) + } + + /// Demultiplex `batch` rows by partition key, encode the ORDER BY + /// columns once for the whole batch, and feed each partition's + /// rows through the rank classifier into its dedicated heap and + /// ties Vec. + pub(crate) fn insert_batch(&mut self, batch: &RecordBatch) -> Result<()> { + let baseline = self.metrics.baseline.clone(); + let _timer = baseline.elapsed_compute().timer(); + + let num_rows = batch.num_rows(); + if num_rows == 0 { + return Ok(()); + } + + // Captured once so the per-tie push from this batch can reuse + // it (computing `get_record_batch_memory_size` is O(cols × + // buffer walk) and we'd otherwise pay it per push and again + // per `try_resize` call). + let input_batch_bytes = get_record_batch_memory_size(batch); + + // 1. Evaluate + encode partition columns into the reusable + // scratch (cleared then appended). + let pk_arrays: Vec = self + .partition_exprs + .iter() + .map(|e| e.evaluate(batch).and_then(|v| v.into_array(num_rows))) + .collect::>()?; + self.partition_scratch_rows.clear(); + self.partition_converter + .append(&mut self.partition_scratch_rows, &pk_arrays)?; + let pk_rows = &self.partition_scratch_rows; + + // 2. Demultiplex row indices by partition key (per-batch). + let mut groups: HashMap> = HashMap::new(); + for i in 0..num_rows { + groups + .entry(pk_rows.row(i).owned()) + .or_default() + .push(i as u32); + } + + // 3. Evaluate ORDER BY columns on the full batch and encode ONCE. + let ob_arrays: Vec = self + .expr + .iter() + .map(|e| e.expr.evaluate(batch).and_then(|v| v.into_array(num_rows))) + .collect::>()?; + self.scratch_rows.clear(); + self.row_converter + .append(&mut self.scratch_rows, &ob_arrays)?; + + // 4. Per-partition: classify each row and dispatch. + let k = self.k; + let mut replacements: usize = 0; + + for (pk, indices) in groups { + let state = self.states.entry(pk).or_insert_with(|| RankPartitionState { + heap: TopKHeap::new(k), + ties: Vec::new(), + }); + + // Equal indices for THIS batch only. Coalesced into a single + // tie entry at the end of the partition's loop. Discarded if + // the boundary moves up mid-loop (those rows were tied to the + // old boundary, which is now strictly worse than the new K-th). + let mut equal_indices: Vec = Vec::new(); + // Lazy-registered: only attached if at least one row reaches + // the heap from this batch in this partition. + let mut entry: Option = None; + + for &orig_idx in &indices { + let row = self.scratch_rows.row(orig_idx as usize); + + // Classify against the current K-th-best (the heap top). + // `heap.max()` returns `None` while the heap is filling, + // so unclassified rows fall through to the heap path. + let classification = state + .heap + .max() + .map(|max_row| row.as_ref().cmp(max_row.row())); + + match classification { + Some(Ordering::Equal) => { + equal_indices.push(orig_idx); + continue; + } + Some(Ordering::Greater) => continue, + Some(Ordering::Less) | None => { + // Heap path: heap not yet full, or row strictly + // better than the current boundary. + let entry_ref = entry.get_or_insert_with(|| { + state.heap.register_batch(batch.clone()) + }); + if let Some(EvictedRow { + batch: evicted_batch, + index: evicted_index, + row_bytes: evicted_bytes, + }) = state.heap.add(entry_ref, row, orig_idx as usize) + { + // Compare the new boundary (post-eviction heap + // top) against the evicted row's bytes — both + // already in encoded form, no clones needed. + let boundary_changed = state + .heap + .max() + .expect("heap was full to evict; must still be full") + .row() + != evicted_bytes.as_slice(); + if boundary_changed { + // Boundary moved up — prior ties (across + // all prior batches) and equal_indices + // accumulated earlier in THIS batch were + // tied to the old boundary, now strictly + // worse than the new K-th-best. Discard. + state.ties.clear(); + equal_indices.clear(); + } else { + // Boundary unchanged — evicted row is tied + // at the (unchanged) boundary; push as a + // single-row entry. + let batch_bytes = + get_record_batch_memory_size(&evicted_batch); + state.ties.push(TieEntry { + batch: evicted_batch, + row_indices: vec![evicted_index as u32], + batch_bytes, + }); + } + } + replacements += 1; + } + } + } + + if let Some(e) = entry { + state.heap.insert_batch_entry(e); + state.heap.maybe_compact()?; + } + + // Commit this batch's ties as a single entry. + if !equal_indices.is_empty() { + state.ties.push(TieEntry { + batch: batch.clone(), + row_indices: equal_indices, + batch_bytes: input_batch_bytes, + }); + } + } + + if replacements > 0 { + self.metrics.row_replacements.add(replacements); + } + self.reservation.try_resize(self.size())?; + Ok(()) + } + + /// Drain all heaps and ties in partition-key order and return the + /// rows as a stream of coalesced [`RecordBatch`]es ordered by + /// `(partition_keys, order_keys)`. Within a partition, heap rows + /// come first (sorted by ob), then tie rows (all sharing the + /// boundary ob). + pub(crate) fn emit(self) -> Result { + let Self { + schema, + metrics, + reservation: _, + expr: _, + row_converter: _, + scratch_rows: _, + partition_exprs: _, + partition_converter: _, + partition_scratch_rows: _, + mut states, + k: _, + batch_size, + } = self; + let _timer = metrics.baseline.elapsed_compute().timer(); + + let mut sorted_pks: Vec = states.keys().cloned().collect(); + sorted_pks.sort(); + + let mut coalescer = BatchCoalescer::new(Arc::clone(&schema), batch_size); + + for pk in sorted_pks { + let RankPartitionState { mut heap, ties, .. } = + states.remove(&pk).expect("key from states.keys()"); + if let Some(batch) = heap.emit()? { + (&batch).record_output(&metrics.baseline); + coalescer.push_batch(batch)?; + } + for tie in ties { + let indices = UInt32Array::from(tie.row_indices); + let tie_batch = take_record_batch(&tie.batch, &indices)?; + (&tie_batch).record_output(&metrics.baseline); + coalescer.push_batch(tie_batch)?; + } + } + coalescer.finish_buffered_batch()?; + + let mut out: Vec> = Vec::new(); + while let Some(b) = coalescer.next_completed_batch() { + out.push(Ok(b)); + } + + Ok(Box::pin(RecordBatchStreamAdapter::new( + schema, + futures::stream::iter(out), + ))) + } + + /// Total memory currently held, including all per-partition states. + fn size(&self) -> usize { + size_of::() + + self.row_converter.size() + + self.partition_converter.size() + + self.scratch_rows.size() + + self.partition_scratch_rows.size() + + self.states.values().map(|s| s.size()).sum::() + + self.states.capacity() + * (size_of::() + size_of::()) + } +} + #[cfg(test)] mod tests { use super::*; @@ -2376,4 +2773,397 @@ mod tests { ); Ok(()) } + + // ==================================================================== + // PartitionedTopKRank operator tests + // + // These mirror the PartitionedTopK tests above plus three RANK-specific + // cases for the Equal / boundary-shift / boundary-unchanged-eviction + // arms in `PartitionedTopKRank::insert_batch`. + // ==================================================================== + + /// Builds a `(pk Int32, val Int32)` schema and a `PartitionedTopKRank` + /// keyed on `pk ASC` (partition) and `val ASC` (ORDER BY). + fn build_partitioned_topk_rank( + k: usize, + ) -> Result<(Arc, PartitionedTopKRank)> { + build_partitioned_topk_rank_with_opts(k, SortOptions::default(), false) + } + + /// Variant of [`build_partitioned_topk_rank`] that lets the test pick + /// the `val` column's `SortOptions` (direction, null ordering) and + /// nullability. + fn build_partitioned_topk_rank_with_opts( + k: usize, + val_sort_options: SortOptions, + val_nullable: bool, + ) -> Result<(Arc, PartitionedTopKRank)> { + let schema = Arc::new(Schema::new(vec![ + Field::new("pk", DataType::Int32, false), + Field::new("val", DataType::Int32, val_nullable), + ])); + + let pk_expr: Arc = col("pk", schema.as_ref())?; + let pk_sort_expr = PhysicalSortExpr { + expr: Arc::clone(&pk_expr), + options: SortOptions::default(), + }; + let val_sort_expr = PhysicalSortExpr { + expr: col("val", schema.as_ref())?, + options: val_sort_options, + }; + + let partition_sort_fields = build_sort_fields(&[pk_sort_expr], &schema)?; + let order_expr = LexOrdering::from([val_sort_expr]); + + let state = PartitionedTopKRank::try_new( + 0, + Arc::clone(&schema), + vec![pk_expr], + partition_sort_fields, + order_expr, + k, + 8, // batch_size + &Arc::new(RuntimeEnv::default()), + &ExecutionPlanMetricsSet::new(), + )?; + Ok((schema, state)) + } + + /// Multiple distinct partition keys interleaved within a single + /// input batch — the per-batch demux, per-partition heap eviction, + /// and partition-key-ordered emit must all behave correctly. No + /// ties: result should match a `ROW_NUMBER` top-K under the same K. + #[tokio::test] + async fn test_partitioned_topk_rank_multi_partition_within_batch() -> Result<()> { + let (schema, mut state) = build_partitioned_topk_rank(2)?; + + // pk=1 vals: 10, 5, 8 → top-2 ASC = [5, 8] + // pk=2 vals: 20, 15 → top-2 ASC = [15, 20] + // pk=3 vals: 7 → top-2 ASC = [7] + let batch = + pk_val_batch(&schema, vec![1, 2, 1, 2, 1, 3], vec![10, 20, 5, 15, 8, 7])?; + state.insert_batch(&batch)?; + + let results: Vec<_> = state.emit()?.try_collect().await?; + assert_batches_eq!( + &[ + "+----+-----+", + "| pk | val |", + "+----+-----+", + "| 1 | 5 |", + "| 1 | 8 |", + "| 2 | 15 |", + "| 2 | 20 |", + "| 3 | 7 |", + "+----+-----+", + ], + &results + ); + Ok(()) + } + + /// State must accumulate across `insert_batch` calls. A row in + /// batch 2 that's strictly better than the existing K-th must + /// evict it; an evicted row whose bytes match the new boundary + /// becomes a `TieEntry` pinned to the prior batch. + #[tokio::test] + async fn test_partitioned_topk_rank_cross_batch_eviction() -> Result<()> { + let (schema, mut state) = build_partitioned_topk_rank(2)?; + + // Batch 1: pk=1 fills the heap with [50, 40]. + state.insert_batch(&pk_val_batch(&schema, vec![1, 1], vec![50, 40])?)?; + + // Batch 2: pk=1 sees a smaller value (10) — it must evict 50; + // 60 > 40 so it's dropped. pk=2 appears mid-stream. + state.insert_batch(&pk_val_batch(&schema, vec![1, 2, 1], vec![10, 99, 60])?)?; + + let results: Vec<_> = state.emit()?.try_collect().await?; + assert_batches_eq!( + &[ + "+----+-----+", + "| pk | val |", + "+----+-----+", + "| 1 | 10 |", + "| 1 | 40 |", + "| 2 | 99 |", + "+----+-----+", + ], + &results + ); + Ok(()) + } + + /// Empty input must produce an empty output stream, not panic. + #[tokio::test] + async fn test_partitioned_topk_rank_empty_input() -> Result<()> { + let (_schema, state) = build_partitioned_topk_rank(3)?; + let results: Vec<_> = state.emit()?.try_collect().await?; + assert!(results.is_empty(), "empty input → empty output"); + Ok(()) + } + + /// `fetch = 1` is a common case (rk = 1 filter) and exercises the + /// boundary-defined-immediately path: after the first admission per + /// partition, `heap.max()` is `Some`, so every subsequent row goes + /// through full Equal/Greater/Less classification. + #[tokio::test] + async fn test_partitioned_topk_rank_fetch_one() -> Result<()> { + let (schema, mut state) = build_partitioned_topk_rank(1)?; + state.insert_batch(&pk_val_batch( + &schema, + vec![1, 1, 2, 2, 3], + vec![3, 1, 9, 4, 7], + )?)?; + + let results: Vec<_> = state.emit()?.try_collect().await?; + assert_batches_eq!( + &[ + "+----+-----+", + "| pk | val |", + "+----+-----+", + "| 1 | 1 |", + "| 2 | 4 |", + "| 3 | 7 |", + "+----+-----+", + ], + &results + ); + Ok(()) + } + + /// `ORDER BY val DESC` exercises the shared encoder's sort-direction + /// handling: the row converter flips the sort sign for `val` so + /// larger values compare smaller in row-encoded form. Each + /// partition keeps its top-K *largest* values. + #[tokio::test] + async fn test_partitioned_topk_rank_desc_ordering() -> Result<()> { + let (schema, mut state) = build_partitioned_topk_rank_with_opts( + 2, + SortOptions { + descending: true, + nulls_first: false, + }, + false, + )?; + + // pk=1 vals: 10, 5, 8, 12 → top-2 DESC = [12, 10] + // pk=2 vals: 20, 15, 25 → top-2 DESC = [25, 20] + let batch = pk_val_batch( + &schema, + vec![1, 2, 1, 2, 1, 1, 2], + vec![10, 20, 5, 15, 8, 12, 25], + )?; + state.insert_batch(&batch)?; + + let results: Vec<_> = state.emit()?.try_collect().await?; + assert_batches_eq!( + &[ + "+----+-----+", + "| pk | val |", + "+----+-----+", + "| 1 | 12 |", + "| 1 | 10 |", + "| 2 | 25 |", + "| 2 | 20 |", + "+----+-----+", + ], + &results + ); + Ok(()) + } + + /// NULL sort values exercise the shared encoder's null-ordering + /// handling. With `ASC NULLS LAST`, NULLs sort *after* every + /// non-NULL value, so a partition whose only non-NULL value beats + /// a NULL must evict the NULL when `K = 1`. A partition that holds + /// only NULLs must still emit them. + #[tokio::test] + async fn test_partitioned_topk_rank_nulls_last_ordering() -> Result<()> { + let (schema, mut state) = build_partitioned_topk_rank_with_opts( + 1, + SortOptions { + descending: false, + nulls_first: false, + }, + true, + )?; + + // pk=1 vals: NULL, 7, NULL → top-1 ASC NULLS LAST = [7] + // pk=2 vals: NULL → top-1 = [NULL] + // pk=3 vals: NULL, 4, 2 → top-1 = [2] + let batch = nullable_pk_val_batch( + &schema, + vec![1, 2, 1, 1, 3, 3, 3], + vec![None, None, Some(7), None, None, Some(4), Some(2)], + )?; + state.insert_batch(&batch)?; + + let results: Vec<_> = state.emit()?.try_collect().await?; + assert_batches_eq!( + &[ + "+----+-----+", + "| pk | val |", + "+----+-----+", + "| 1 | 7 |", + "| 2 | |", + "| 3 | 2 |", + "+----+-----+", + ], + &results + ); + Ok(()) + } + + /// `ASC NULLS FIRST` (the `SortOptions::default()`) sorts NULLs + /// *before* every non-NULL value, so under `fetch = K` a partition's + /// NULLs are kept preferentially over larger non-NULL values. + #[tokio::test] + async fn test_partitioned_topk_rank_nulls_first_ordering() -> Result<()> { + let (schema, mut state) = build_partitioned_topk_rank_with_opts( + 2, + SortOptions { + descending: false, + nulls_first: true, + }, + true, + )?; + + // pk=1 vals: NULL, 5, NULL, 8 → top-2 ASC NULLS FIRST = [NULL, NULL] + // pk=2 vals: 7, NULL → top-2 = [NULL, 7] + // pk=3 vals: 3, 1 → top-2 = [1, 3] + let batch = nullable_pk_val_batch( + &schema, + vec![1, 2, 1, 3, 1, 2, 1, 3], + vec![ + None, + Some(7), + Some(5), + Some(3), + None, + None, + Some(8), + Some(1), + ], + )?; + state.insert_batch(&batch)?; + + let results: Vec<_> = state.emit()?.try_collect().await?; + assert_batches_eq!( + &[ + "+----+-----+", + "| pk | val |", + "+----+-----+", + "| 1 | |", + "| 1 | |", + "| 2 | |", + "| 2 | 7 |", + "| 3 | 1 |", + "| 3 | 3 |", + "+----+-----+", + ], + &results + ); + Ok(()) + } + + /// RANK-specific: heap fills with K rows tied at the same OB value, + /// then more rows at that same value arrive. They take the Equal arm + /// (heap is full, `heap.max() == row`) and accumulate as ties, while + /// strictly-greater rows are dropped. All retained rows have rank 1. + #[tokio::test] + async fn test_partitioned_topk_rank_boundary_ties_retained() -> Result<()> { + let (schema, mut state) = build_partitioned_topk_rank(2)?; + + // pk=1 vals: 5, 5, 10, 5 + // - first two 5s fill the heap (max=None until heap reaches K=2) + // - third row 10 > 5 → drop (Greater) + // - fourth row 5 == 5 → push to ties (Equal) + // Sorted RANKs: 5→1, 5→1, 5→1, 10→4. WHERE rk ≤ 2 keeps the three 5s. + let batch = pk_val_batch(&schema, vec![1, 1, 1, 1], vec![5, 5, 10, 5])?; + state.insert_batch(&batch)?; + + let results: Vec<_> = state.emit()?.try_collect().await?; + assert_batches_eq!( + &[ + "+----+-----+", + "| pk | val |", + "+----+-----+", + "| 1 | 5 |", + "| 1 | 5 |", + "| 1 | 5 |", + "+----+-----+", + ], + &results + ); + Ok(()) + } + + /// RANK-specific: heap fills with K rows tied at value V, equal_indices + /// accumulate at V, then a strictly-better row arrives whose admission + /// shifts the boundary strictly below V. The boundary-changed branch + /// must clear both `state.ties` and the in-flight `equal_indices` — + /// otherwise the now-rank-> K rows at value V would leak into output. + #[tokio::test] + async fn test_partitioned_topk_rank_boundary_shifts_clears_ties() -> Result<()> { + let (schema, mut state) = build_partitioned_topk_rank(2)?; + + // pk=1 vals: 10, 10, 10, 5, 3 + // - first two 10s fill heap (max=10) + // - third 10 → Equal → equal_indices=[2] + // - 5 < 10 → admit, evict 10 → heap={5,10}, max=10 (unchanged). + // Push evicted to ties: ties=[10@curr_batch[ev_idx]]. + // - 3 < 10 → admit, evict 10 → heap={3,5}, max=5 (CHANGED). + // Clear ties AND equal_indices. + // Sorted RANKs: 3→1, 5→2, 10→3, 10→3, 10→3. WHERE rk ≤ 2 → [3, 5]. + let batch = pk_val_batch(&schema, vec![1, 1, 1, 1, 1], vec![10, 10, 10, 5, 3])?; + state.insert_batch(&batch)?; + + let results: Vec<_> = state.emit()?.try_collect().await?; + assert_batches_eq!( + &[ + "+----+-----+", + "| pk | val |", + "+----+-----+", + "| 1 | 3 |", + "| 1 | 5 |", + "+----+-----+", + ], + &results + ); + Ok(()) + } + + /// RANK-specific: heap has multiple rows at boundary value V, then a + /// strictly-better row arrives. The heap evicts one V (popping + /// `prev_min`), but `heap.max()` is still V — boundary unchanged. + /// The evicted V row must be pushed as a `TieEntry`; without that + /// branch a `rk <= K` query would silently lose a tied row. + #[tokio::test] + async fn test_partitioned_topk_rank_eviction_at_unchanged_boundary() -> Result<()> { + let (schema, mut state) = build_partitioned_topk_rank(2)?; + + // pk=1 vals: 10, 10, 5 + // - first two 10s fill the heap (max=10) + // - 5 < 10 → admit, evict 10. New heap={5,10}, max=10 (unchanged). + // Push the evicted 10 to ties. + // Sorted RANKs: 5→1, 10→2, 10→2. WHERE rk ≤ 2 → all 3 rows. + let batch = pk_val_batch(&schema, vec![1, 1, 1], vec![10, 10, 5])?; + state.insert_batch(&batch)?; + + let results: Vec<_> = state.emit()?.try_collect().await?; + assert_batches_eq!( + &[ + "+----+-----+", + "| pk | val |", + "+----+-----+", + "| 1 | 5 |", + "| 1 | 10 |", + "| 1 | 10 |", + "+----+-----+", + ], + &results + ); + Ok(()) + } } From 78c0c935b676dafb71cd377ecc99a3d8e9fcc337 Mon Sep 17 00:00:00 2001 From: SubhamSinghal Date: Tue, 30 Jun 2026 23:45:30 +0530 Subject: [PATCH 3/3] Fix build failure --- .../physical-plan/src/sorts/partitioned_topk.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/partitioned_topk.rs b/datafusion/physical-plan/src/sorts/partitioned_topk.rs index 4491b9a161849..7ff47ea7e8e7e 100644 --- a/datafusion/physical-plan/src/sorts/partitioned_topk.rs +++ b/datafusion/physical-plan/src/sorts/partitioned_topk.rs @@ -23,10 +23,10 @@ //! FROM t WHERE rn <= N //! ``` //! -//! Instead of sorting the entire dataset, this operator delegates to -//! [`PartitionedTopK`] (for `ROW_NUMBER`) or [`PartitionedTopKRank`] -//! (for `RANK`), both of which maintain one heap per distinct partition -//! key while sharing a single [`arrow::row::RowConverter`], +//! Instead of sorting the entire dataset, this operator delegates to a +//! per-partition heap-of-K implementation (one variant for `ROW_NUMBER` +//! and a sibling variant for `RANK`), both of which maintain one heap per +//! distinct partition key while sharing a single [`arrow::row::RowConverter`], //! [`MemoryReservation`](datafusion_execution::memory_pool::MemoryReservation), //! and metrics set across all partitions, and emit only the top-K rows //! per partition in sorted order `(partition_keys, order_keys)`. @@ -108,9 +108,9 @@ pub enum WindowFnKind { /// ``` /// /// Instead of sorting the entire dataset, this operator reads unsorted input -/// and delegates to [`PartitionedTopK`] (for `ROW_NUMBER`) or -/// [`PartitionedTopKRank`] (for `RANK`), each maintaining one heap per -/// distinct partition key while sharing a single +/// and delegates to a per-partition heap-of-K implementation (`PartitionedTopK` +/// for `ROW_NUMBER` and `PartitionedTopKRank` for `RANK`), each maintaining +/// one heap per distinct partition key while sharing a single /// [`arrow::row::RowConverter`] / /// [`MemoryReservation`](datafusion_execution::memory_pool::MemoryReservation) /// across all partitions, and emits only the top-K rows per partition in