perf(hash-join): eliminate intermediate array allocations in probe-side collision filter#23209
perf(hash-join): eliminate intermediate array allocations in probe-side collision filter#23209LiaCastaneda wants to merge 4 commits into
Conversation
Replace `equal_rows_arr` (Arrow take + eq_dyn_null + FilterBuilder, O(matched_pairs) allocations) with an in-place `JoinKeyComparator` loop. On collision-free build sides — detected once at build time by scanning the `next` chain for adjacent pairs with distinct keys — skip the per-pair recheck entirely: probe rows form consecutive runs in the output buffer, so we check the chain head once and accept/reject the whole run. This cuts key comparisons from F (fanout) per probe row down to 1 on uniform-key build sides, producing a 2.4× speedup on high-fanout string-key joins (Q23, SF100: 1.01s → 0.42s join_time). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
ccfebda to
06503c2
Compare
Adds a benchmark query reproducing the high-fanout collision-free join pattern: 32K-row string-key build side (415 distinct keys, ~26-char strings) joined against a 600M-row probe side with a single matching key, yielding ~174M output rows (fanout ~78×). Uses `collect_statistics = false` to let the planner choose Partitioned mode (no row stats → can't prove small build → repartitions both sides), matching production connector behavior. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
06503c2 to
e3b8172
Compare
| /// Example — keys `["cat", "cat", "dog"]`, next `[0, 1, 2]`: | ||
| /// row 1 → prev 0: "cat"=="cat" ✓ | ||
| /// row 2 → prev 1: "dog"!="cat" → return true (collision found) | ||
| fn detect_key_collisions<T>( |
There was a problem hiding this comment.
I checked in the other engine (Trino) and there when a build row is inserted, Trino only inserts rows that share the same key, so by construction, every chain is already "pure" and hash collisions are not possible.
This happens because Trino uses a hash table that resolves key equality at insert time. DataFusion's update_from_iter only receives hashes and row indices (it has no access to key values) so it chains all rows in the same hash bucket together regardless of whether they share a key or not.
There was a problem hiding this comment.
Note doing the same in DF would be non trivial because it would require modifying update_from_iter trait signature to accept key columns alongside the hashes, and updating all call sites accordingly.
Which issue does this PR close?
Rationale for this change
When HashJoinExec builds the probe the hash table, it collects all candidate (build_idx, probe_idx) pairs and then rechecks every pair with
equal_rows_arr(shows in the profile, taking 20% of the query CPU) to filter out hash collisions. This recheck materializes intermediate Arrow arrays sized to the number of matched pairs (take+eq_dyn_null+FilterBuilder), making it O(matched_pairs) in both allocations and comparisons.For high-fanout joins (many build rows per distinct key), matched_pairs = probe_rows × fanout. At fanout 78× over 2.3M probe rows this produces ~176M pairs — and the recheck runs on all of them, even though nearly all pass.
This cost is most visible when:
What changes are included in this PR?
Step 1: Replace equal_rows_arr with an in-place
JoinKeyComparatorloop , eliminating the O(matched_pairs) Arrow allocations in the fallback path.Step 2: At build time, scan the next chain once to detect whether the map is collision-free (all adjacent linked pairs share the same key). If so, the probe phase checks once per run of consecutive same-probe-idx pairs and accepts the entire run.
Are these changes tested?
Existing tests pass, I also added some tests for the new function that detects hash collisions
detect_key_collisionsI did some profiling on the added Q23:
We can see the CPU for the HashJoin was cut in half.
Also the benchmarks results show Q23 runs ~x2.5 times faster:
Are there any user-facing changes?
No