diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index a5da391ee7635..a88852f4fc53a 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -216,9 +216,21 @@ pub(super) struct JoinLeftData { pub(super) probe_side_non_empty: AtomicBool, /// Shared atomic flag indicating if any probe partition saw NULL in join keys (for null-aware anti joins) pub(super) probe_side_has_null: AtomicBool, + /// `true` if any hash bucket holds build rows with differing join keys + /// (hash collisions). When `false`, every chain is "pure" and the + /// probe side can validate a chain with a single key check at its head + /// instead of re-checking every duplicate. Computed once at build time. + has_key_collisions: bool, } impl JoinLeftData { + /// Returns `true` if the build side has any real hash collisions (a bucket + /// holding rows with differing join keys). When `false`, the probe side can + /// skip the per-duplicate key recheck. See [`Self::has_key_collisions`]. + pub(super) fn has_key_collisions(&self) -> bool { + self.has_key_collisions + } + /// return a reference to the map pub(super) fn map(&self) -> &Map { &self.map @@ -2088,6 +2100,17 @@ async fn collect_left_input( (Map::HashMap(hashmap), batch, left_values) }; + // Detect whether the build side has real hash collisions (a bucket with + // differing keys). When it doesn't, the probe side can validate each chain + // with a single key check at its head instead of re-checking every + // duplicate. + let has_key_collisions = match &join_hash_map { + Map::HashMap(hashmap) => { + hashmap.has_key_collisions(&left_values, null_equality)? + } + Map::ArrayMap(_) => false, + }; + // Reserve additional memory for visited indices bitmap and create shared builder let visited_indices_bitmap = if with_visited_indices_bitmap { let bitmap_size = bit_util::ceil(batch.num_rows(), 8); @@ -2144,6 +2167,7 @@ async fn collect_left_input( membership, probe_side_non_empty: AtomicBool::new(false), probe_side_has_null: AtomicBool::new(false), + has_key_collisions, }; Ok(data) @@ -4688,6 +4712,8 @@ mod tests { None, 8192, (0, None), + // Exercise the per-pair recheck path. + true, &mut probe_indices_buffer, &mut build_indices_buffer, )?; @@ -4750,6 +4776,8 @@ mod tests { None, 8192, (0, None), + // Exercise the per-pair recheck path. + true, &mut probe_indices_buffer, &mut build_indices_buffer, )?; diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index 2aa6e69dff807..2fc35f13c7522 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -33,7 +33,7 @@ use crate::joins::hash_join::shared_bounds::{ PartitionBounds, PartitionBuildData, SharedBuildAccumulator, }; use crate::joins::utils::{ - OnceFut, equal_rows_arr, get_final_indices_from_shared_bitmap, matchable_join_keys, + JoinKeyComparator, OnceFut, get_final_indices_from_shared_bitmap, matchable_join_keys, }; use crate::stream::EmptyRecordBatchStream; use crate::{ @@ -402,6 +402,7 @@ pub(super) fn lookup_join_hashmap( valid_keys: Option<&NullBuffer>, limit: usize, offset: MapOffset, + has_key_collisions: bool, probe_indices_buffer: &mut Vec, build_indices_buffer: &mut Vec, ) -> Result<(UInt64Array, UInt32Array, Option)> { @@ -414,26 +415,58 @@ pub(super) fn lookup_join_hashmap( build_indices_buffer, ); - let build_indices_unfiltered: UInt64Array = - std::mem::take(build_indices_buffer).into(); - let probe_indices_unfiltered: UInt32Array = - std::mem::take(probe_indices_buffer).into(); - - // TODO: optimize equal_rows_arr to avoid allocation of intermediate arrays - // https://github.com/apache/datafusion/issues/12131 - let (build_indices, probe_indices) = equal_rows_arr( - &build_indices_unfiltered, - &probe_indices_unfiltered, + // Validate the candidate (build, probe) pairs against the join key to drop + // hash collisions. We compare values in place via a prebuilt comparator, + // avoiding the take() + eq_dyn_null() + FilterBuilder allocations that + // equal_rows_arr performs at O(matched_pairs) scale. + // See: https://github.com/apache/datafusion/issues/12131 + let comparator = JoinKeyComparator::for_equality( build_side_values, probe_side_values, null_equality, )?; - // Reclaim buffers - *build_indices_buffer = build_indices_unfiltered.into_parts().1.into(); - *probe_indices_buffer = probe_indices_unfiltered.into_parts().1.into(); + let mut build_out: Vec = Vec::with_capacity(build_indices_buffer.len()); + let mut probe_out: Vec = Vec::with_capacity(probe_indices_buffer.len()); + + if has_key_collisions { + // A bucket may mix keys, so every candidate pair must be rechecked. + for (b, p) in build_indices_buffer.iter().zip(probe_indices_buffer.iter()) { + if comparator.is_equal(*b as usize, *p as usize) { + build_out.push(*b); + probe_out.push(*p); + } + } + } else { + // Collision-free build side: all pairs in one probe row's run share + // the same build key, so checking the key once at the run head + // accepts or rejects the whole run (F comparisons per probe row -> 1). + let builds = build_indices_buffer.as_slice(); + let probes = probe_indices_buffer.as_slice(); + let mut start = 0; + while start < probes.len() { + let probe_idx = probes[start]; + // Find the end of this probe row's run (equal probe indices are + // contiguous). Peek the next index for the common 1:1 case; fall + // back to a binary search for long runs (high fanout). + let end = if start + 1 >= probes.len() || probes[start + 1] != probe_idx { + start + 1 + } else { + start + probes[start..].partition_point(|&p| p == probe_idx) + }; + if comparator.is_equal(builds[start] as usize, probe_idx as usize) { + build_out.extend_from_slice(&builds[start..end]); + probe_out.extend_from_slice(&probes[start..end]); + } + start = end; + } + } + + // Reclaim buffers for the next call + build_indices_buffer.clear(); + probe_indices_buffer.clear(); - Ok((build_indices, probe_indices, next_offset)) + Ok((build_out.into(), probe_out.into(), next_offset)) } /// Counts the number of distinct elements in the input array. @@ -808,6 +841,7 @@ impl HashJoinStream { state.valid_keys.as_ref(), self.batch_size, state.offset, + build_side.left_data.has_key_collisions(), &mut self.probe_indices_buffer, &mut self.build_indices_buffer, )?, diff --git a/datafusion/physical-plan/src/joins/join_hash_map.rs b/datafusion/physical-plan/src/joins/join_hash_map.rs index 454cc916aeb12..eafbec34ecdd7 100644 --- a/datafusion/physical-plan/src/joins/join_hash_map.rs +++ b/datafusion/physical-plan/src/joins/join_hash_map.rs @@ -22,9 +22,11 @@ use std::fmt::{self, Debug}; use std::ops::Sub; -use arrow::array::BooleanArray; +use crate::joins::utils::JoinKeyComparator; +use arrow::array::{ArrayRef, BooleanArray}; use arrow::buffer::{BooleanBuffer, NullBuffer}; use arrow::datatypes::ArrowNativeType; +use datafusion_common::{NullEquality, Result}; use hashbrown::HashTable; use hashbrown::hash_table::Entry::{Occupied, Vacant}; @@ -131,6 +133,18 @@ pub trait JoinHashMapType: Send + Sync { match_indices: &mut Vec, ) -> Option; + /// Returns `true` if any bucket holds build rows with differing join keys + /// (real hash collisions). When `false`, the probe can check once per + /// chain head and accept the whole run. Scanned once at build time. + fn has_key_collisions( + &self, + left_values: &[ArrayRef], + null_equality: NullEquality, + ) -> Result { + let _ = (left_values, null_equality); + Ok(true) + } + /// Returns a BooleanArray indicating which of the provided hashes exist in the map. fn contain_hashes(&self, hash_values: &[u64]) -> BooleanArray; @@ -208,6 +222,14 @@ impl JoinHashMapType for JoinHashMapU32 { ) } + fn has_key_collisions( + &self, + left_values: &[ArrayRef], + null_equality: NullEquality, + ) -> Result { + detect_key_collisions(&self.next, left_values, null_equality) + } + fn contain_hashes(&self, hash_values: &[u64]) -> BooleanArray { contain_hashes(&self.map, hash_values) } @@ -288,6 +310,14 @@ impl JoinHashMapType for JoinHashMapU64 { ) } + fn has_key_collisions( + &self, + left_values: &[ArrayRef], + null_equality: NullEquality, + ) -> Result { + detect_key_collisions(&self.next, left_values, null_equality) + } + fn contain_hashes(&self, hash_values: &[u64]) -> BooleanArray { contain_hashes(&self.map, hash_values) } @@ -491,6 +521,40 @@ pub fn contain_hashes(map: &HashTable<(u64, T)>, hash_values: &[u64]) -> Bool BooleanArray::new(buffer, None) } +/// Scans the `next` chain to detect real hash collisions (two build rows in +/// the same bucket with different keys). `next[i]` stores `prev_row + 1` +/// (`0` = end of chain). Checking every adjacent linked pair is sufficient: +/// any two distinct keys in the same bucket must appear as neighbors somewhere. +/// +/// Example — keys `["cat", "cat", "dog"]`, next `[0, 1, 2]`: +/// row 1 → prev 0: "cat"=="cat" ✓ +/// row 2 → prev 1: "dog"!="cat" → return true (collision found) +fn detect_key_collisions( + next: &[T], + left_values: &[ArrayRef], + null_equality: NullEquality, +) -> Result +where + T: ArrowNativeType + Into, +{ + if next.is_empty() { + return Ok(false); + } + let comparator = + JoinKeyComparator::for_equality(left_values, left_values, null_equality)?; + for (row, &link) in next.iter().enumerate() { + let link: u64 = link.into(); + if link != 0 { + // `link` is `prev_row + 1`; both rows live in the same bucket. + let prev = (link - 1) as usize; + if !comparator.is_equal(row, prev) { + return Ok(true); + } + } + } + Ok(false) +} + #[cfg(test)] mod tests { use super::*; @@ -569,4 +633,31 @@ mod tests { assert_eq!(input_indices, vec![1, 1]); assert_eq!(match_indices, vec![3, 1]); } + + #[test] + fn test_has_key_collisions_same_key() -> Result<()> { + // 5 build rows all with key 10 chained in the same bucket — no collision. + // next: [0, 1, 2, 3, 4] → chain 4→3→2→1→0→end + use arrow::array::Int32Array; + use std::sync::Arc; + let next: Vec = vec![0, 1, 2, 3, 4]; + let map = JoinHashMapU32::new(HashTable::new(), next); + let keys: ArrayRef = Arc::new(Int32Array::from(vec![10, 10, 10, 10, 10])); + assert!(!map.has_key_collisions(&[keys], NullEquality::NullEqualsNothing)?); + Ok(()) + } + + #[test] + fn test_has_key_collisions_distinct_keys() -> Result<()> { + // 5 build rows, 4 with key 10 and 1 with key 20 buried in the chain. + // next: [0, 1, 2, 3, 4] → chain 4→3→2→1→0→end + // Row 2 has key 20 — adjacent pair (row 2, row 1) differs → collision. + use arrow::array::Int32Array; + use std::sync::Arc; + let next: Vec = vec![0, 1, 2, 3, 4]; + let map = JoinHashMapU32::new(HashTable::new(), next); + let keys: ArrayRef = Arc::new(Int32Array::from(vec![10, 10, 20, 10, 10])); + assert!(map.has_key_collisions(&[keys], NullEquality::NullEqualsNothing)?); + Ok(()) + } } diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 39a4c178ca4b6..96f127879b41e 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -2332,6 +2332,19 @@ impl JoinKeyComparator { Ok(Self { first, rest }) } + /// Build equality-only comparators for each join key column pair. + /// + /// Unlike [`Self::new`], no `SortOptions` are required — `SortOptions::default()` + /// is used internally, which is correct because callers only test `== Equal`. + pub fn for_equality( + left_arrays: &[ArrayRef], + right_arrays: &[ArrayRef], + null_equality: NullEquality, + ) -> Result { + let sort_options = vec![SortOptions::default(); left_arrays.len()]; + Self::new(left_arrays, right_arrays, &sort_options, null_equality) + } + /// Compare row `left` (in the left arrays) with row `right` (in the right /// arrays). Returns the lexicographic ordering across all key columns. #[inline]