Skip to content

perf(hash-join): eliminate intermediate array allocations in probe-side collision filter#23209

Open
LiaCastaneda wants to merge 4 commits into
apache:mainfrom
LiaCastaneda:perf/hash-join-no-take-in-probe
Open

perf(hash-join): eliminate intermediate array allocations in probe-side collision filter#23209
LiaCastaneda wants to merge 4 commits into
apache:mainfrom
LiaCastaneda:perf/hash-join-no-take-in-probe

Conversation

@LiaCastaneda

@LiaCastaneda LiaCastaneda commented Jun 26, 2026

Copy link
Copy Markdown
Contributor

Which issue does this PR close?

Rationale for this change

When HashJoinExec builds the probe the hash table, it collects all candidate (build_idx, probe_idx) pairs and then rechecks every pair with equal_rows_arr (shows in the profile, taking 20% of the query CPU) to filter out hash collisions. This recheck materializes intermediate Arrow arrays sized to the number of matched pairs (take + eq_dyn_null + FilterBuilder), making it O(matched_pairs) in both allocations and comparisons.

For high-fanout joins (many build rows per distinct key), matched_pairs = probe_rows × fanout. At fanout 78× over 2.3M probe rows this produces ~176M pairs — and the recheck runs on all of them, even though nearly all pass.

This cost is most visible when:

  • The build side has duplicate keys
  • The join output is large (high fanout × large probe side)
  • Hash partition skew concentrates most matched pairs onto a single partition, making the per-partition allocation cost directly observable as query latency

What changes are included in this PR?

Step 1: Replace equal_rows_arr with an in-place JoinKeyComparator loop , eliminating the O(matched_pairs) Arrow allocations in the fallback path.

Step 2: At build time, scan the next chain once to detect whether the map is collision-free (all adjacent linked pairs share the same key). If so, the probe phase checks once per run of consecutive same-probe-idx pairs and accepts the entire run.


Are these changes tested?

Existing tests pass, I also added some tests for the new function that detects hash collisions detect_key_collisions

I did some profiling on the added Q23:

image image

We can see the CPU for the HashJoin was cut in half.
Also the benchmarks results show Q23 runs ~x2.5 times faster:

  • main: 1.01s
  • this PR: 0.42s

Are there any user-facing changes?

No

@LiaCastaneda LiaCastaneda changed the title perf(hash-join): replace equal_rows_arr with JoinKeyComparator in pro… perf(hash-join): eliminate intermediate array allocations in probe-side collision filter Jun 26, 2026
@github-actions github-actions Bot added the physical-plan Changes to the physical-plan crate label Jun 26, 2026
Replace `equal_rows_arr` (Arrow take + eq_dyn_null + FilterBuilder,
O(matched_pairs) allocations) with an in-place `JoinKeyComparator` loop.
On collision-free build sides — detected once at build time by scanning
the `next` chain for adjacent pairs with distinct keys — skip the per-pair
recheck entirely: probe rows form consecutive runs in the output buffer,
so we check the chain head once and accept/reject the whole run.

This cuts key comparisons from F (fanout) per probe row down to 1 on
uniform-key build sides, producing a 2.4× speedup on high-fanout
string-key joins (Q23, SF100: 1.01s → 0.42s join_time).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Adds a benchmark query reproducing the high-fanout collision-free join
pattern: 32K-row string-key build side (415 distinct keys, ~26-char
strings) joined against a 600M-row probe side with a single matching
key, yielding ~174M output rows (fanout ~78×). Uses
`collect_statistics = false` to let the planner choose Partitioned mode
(no row stats → can't prove small build → repartitions both sides),
matching production connector behavior.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@LiaCastaneda LiaCastaneda force-pushed the perf/hash-join-no-take-in-probe branch from 06503c2 to e3b8172 Compare June 29, 2026 13:22
/// Example — keys `["cat", "cat", "dog"]`, next `[0, 1, 2]`:
/// row 1 → prev 0: "cat"=="cat" ✓
/// row 2 → prev 1: "dog"!="cat" → return true (collision found)
fn detect_key_collisions<T>(

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked in the other engine (Trino) and there when a build row is inserted, Trino only inserts rows that share the same key, so by construction, every chain is already "pure" and hash collisions are not possible.
This happens because Trino uses a hash table that resolves key equality at insert time. DataFusion's update_from_iter only receives hashes and row indices (it has no access to key values) so it chains all rows in the same hash bucket together regardless of whether they share a key or not.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note doing the same in DF would be non trivial because it would require modifying update_from_iter trait signature to accept key columns alongside the hashes, and updating all call sites accordingly.

@LiaCastaneda LiaCastaneda marked this pull request as ready for review June 29, 2026 15:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

physical-plan Changes to the physical-plan crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

High HashJoin probe latency under high fanout and key skew (Partitioned mode)

1 participant