diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index b1d387ea74557..a11b89175f3eb 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -1363,6 +1363,7 @@ impl ExecutionPlan for HashJoinExec { filter, on_right, repartition_random_state, + self.null_aware, )) }))) }) diff --git a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs index 0af4015ff7239..c8ef965186702 100644 --- a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs +++ b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs @@ -37,7 +37,7 @@ use datafusion_common::{DataFusionError, Result, ScalarValue, SharedResult}; use datafusion_expr::Operator; use datafusion_functions::core::r#struct as struct_func; use datafusion_physical_expr::expressions::{ - BinaryExpr, CaseExpr, DynamicFilterPhysicalExpr, InListExpr, lit, + BinaryExpr, CaseExpr, DynamicFilterPhysicalExpr, InListExpr, IsNullExpr, lit, }; use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef, ScalarFunctionExpr}; @@ -255,6 +255,9 @@ pub(crate) struct SharedBuildAccumulator { repartition_random_state: SeededRandomState, /// Schema of the probe (right) side for evaluating filter expressions probe_schema: Arc, + /// Null-aware anti join (`NOT IN`). A probe-side NULL must reach the join so its + /// three-valued logic can collapse the result, so the pushed filter keeps NULL rows. + null_aware: bool, } /// Strategy for filter pushdown (decided at collection time) @@ -358,6 +361,7 @@ impl SharedBuildAccumulator { dynamic_filter: Arc, on_right: Vec, repartition_random_state: SeededRandomState, + null_aware: bool, ) -> Self { // Troubleshooting: If partition counts are incorrect, verify this logic matches // the actual execution pattern in collect_build_side() @@ -404,6 +408,7 @@ impl SharedBuildAccumulator { on_right, repartition_random_state, probe_schema: right_child.schema(), + null_aware, } } @@ -579,7 +584,8 @@ impl SharedBuildAccumulator { if let Some(filter_expr) = combine_membership_and_bounds(membership_expr, bounds_expr) { - self.dynamic_filter.update(filter_expr)?; + self.dynamic_filter + .update(self.null_aware_filter(filter_expr))?; } } PartitionStatus::Pending => { @@ -685,12 +691,40 @@ impl SharedBuildAccumulator { )?) as Arc }; - self.dynamic_filter.update(filter_expr)?; + self.dynamic_filter + .update(self.null_aware_filter(filter_expr))?; } } Ok(()) } + + /// Wraps a pushdown filter so a null-aware anti join keeps its probe-side NULL rows. + /// + /// The build-side predicate drops probe rows whose key is NULL, but `NOT IN` three-valued + /// logic needs that NULL to reach the join. OR-ing `probe_key IS NULL` preserves the dynamic + /// filter's selectivity for non-NULL rows while letting the NULL through. + fn null_aware_filter( + &self, + filter_expr: Arc, + ) -> Arc { + if !self.null_aware { + return filter_expr; + } + debug_assert_eq!( + self.on_right.len(), + 1, + "null_aware anti join must have exactly one probe key" + ); + let probe_key_is_null: Arc = + Arc::new(IsNullExpr::new(Arc::clone(&self.on_right[0]))); + // Cheap null check first short-circuits before the costlier dynamic filter. + Arc::new(BinaryExpr::new( + probe_key_is_null, + Operator::Or, + filter_expr, + )) + } } impl fmt::Debug for SharedBuildAccumulator { @@ -722,6 +756,7 @@ pub(super) fn make_partitioned_accumulator_for_test( on_right: vec![], repartition_random_state: SeededRandomState::with_seed(1), probe_schema, + null_aware: false, } } diff --git a/datafusion/sqllogictest/test_files/null_aware_anti_join.slt b/datafusion/sqllogictest/test_files/null_aware_anti_join.slt index b18f3b3ae7a99..1d12fc33c9a29 100644 --- a/datafusion/sqllogictest/test_files/null_aware_anti_join.slt +++ b/datafusion/sqllogictest/test_files/null_aware_anti_join.slt @@ -451,3 +451,68 @@ DROP TABLE customers_test; statement ok DROP TABLE all_null_banned; + +############# +## Test: dynamic filter pushdown must not drop inner (probe-side) NULLs. +## With join dynamic filter pushdown on, the build-side filter pushed to the probe scan would drop +## inner NULLs, but NOT IN three-valued logic needs them to collapse the result to zero rows. The +## in-memory VALUES scans above never apply the pushed filter, so this case needs a parquet scan. +############# + +statement ok +set datafusion.optimizer.enable_join_dynamic_filter_pushdown = true; + +# Row-level parquet filtering, so the pushed filter actually drops matching rows instead of only +# pruning row groups. Without this the single row group is read whole and the NULL never gets dropped. +statement ok +set datafusion.execution.parquet.pushdown_filters = true; + +statement ok +CREATE TABLE asa_outer(id INT) AS VALUES (1), (2), (3); + +statement ok +CREATE TABLE asa_inner(eid INT) AS VALUES (2), (NULL); + +query I +COPY asa_outer TO 'test_files/scratch/null_aware_anti_join/asa_outer.parquet' STORED AS PARQUET; +---- +3 + +query I +COPY asa_inner TO 'test_files/scratch/null_aware_anti_join/asa_inner.parquet' STORED AS PARQUET; +---- +2 + +statement ok +CREATE EXTERNAL TABLE asa_outer_parquet(id INT) +STORED AS PARQUET +LOCATION 'test_files/scratch/null_aware_anti_join/asa_outer.parquet'; + +statement ok +CREATE EXTERNAL TABLE asa_inner_parquet(eid INT) +STORED AS PARQUET +LOCATION 'test_files/scratch/null_aware_anti_join/asa_inner.parquet'; + +# Expected: zero rows. Before the fix the pushed dynamic filter dropped inner NULLs, so the join +# wrongly returned id = 1 and id = 3. +query I +SELECT id FROM asa_outer_parquet WHERE id NOT IN (SELECT eid FROM asa_inner_parquet) ORDER BY id; +---- + +statement ok +DROP TABLE asa_outer; + +statement ok +DROP TABLE asa_inner; + +statement ok +DROP TABLE asa_outer_parquet; + +statement ok +DROP TABLE asa_inner_parquet; + +statement ok +RESET datafusion.execution.parquet.pushdown_filters; + +statement ok +RESET datafusion.optimizer.enable_join_dynamic_filter_pushdown;