From 54cf477bfc127f0343b2c99797fa8e25366eadf9 Mon Sep 17 00:00:00 2001 From: Jiawei Zhao Date: Wed, 17 Jun 2026 16:02:10 +0800 Subject: [PATCH 01/10] test: cover the merged key of USING/NATURAL joins Add coverage that the merged key of a USING / NATURAL join is COALESCE(left, right) across SELECT, WHERE, ORDER BY and wildcard expansion, including RIGHT / FULL joins where the left key is NULL-padded. Refs #22881 Signed-off-by: Jiawei Zhao --- .../test_files/join_using_merged_key.slt | 169 ++++++++++++++++++ 1 file changed, 169 insertions(+) create mode 100644 datafusion/sqllogictest/test_files/join_using_merged_key.slt diff --git a/datafusion/sqllogictest/test_files/join_using_merged_key.slt b/datafusion/sqllogictest/test_files/join_using_merged_key.slt new file mode 100644 index 0000000000000..2ad1066414dae --- /dev/null +++ b/datafusion/sqllogictest/test_files/join_using_merged_key.slt @@ -0,0 +1,169 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Merged key of USING / NATURAL joins. +# +# The single merged key column exposed by a USING / NATURAL join equals +# COALESCE(left.k, right.k): for an outer join a row present on only one side +# must expose the key from the present side, not NULL. This matters for +# RIGHT / FULL joins, whose left key is NULL-padded on right-only rows. +# +# Regression coverage for https://github.com/apache/datafusion/issues/22881, +# where the merged key was resolved to the left column unconditionally and so +# came out NULL for right-only rows of RIGHT / FULL joins. +# +# Tables: a has keys {1,2,3}, b has keys {2,3,4}. +# matched: 2, 3 ; left-only: 1 ; right-only: 4 + +statement ok +create table a(k int, x int) as values (1, 10), (2, 20), (3, 30); + +statement ok +create table b(k int, y int) as values (2, 200), (3, 300), (4, 400); + +########## +# Already correct: INNER / LEFT (left key is never NULL-padded) +########## + +query I nosort +SELECT k FROM a INNER JOIN b USING (k) ORDER BY k +---- +2 +3 + +query I nosort +SELECT k FROM a LEFT JOIN b USING (k) ORDER BY k +---- +1 +2 +3 + +########## +# The bug: RIGHT / FULL merged key must come from the preserved side +########## + +query I nosort +SELECT k FROM a RIGHT JOIN b USING (k) ORDER BY k NULLS LAST +---- +2 +3 +4 + +query I nosort +SELECT k FROM a FULL JOIN b USING (k) ORDER BY k NULLS LAST +---- +1 +2 +3 +4 + +query I nosort +SELECT k FROM a NATURAL RIGHT JOIN b ORDER BY k NULLS LAST +---- +2 +3 +4 + +query I nosort +SELECT k FROM a NATURAL FULL JOIN b ORDER BY k NULLS LAST +---- +1 +2 +3 +4 + +########## +# Downstream of the merged key: WHERE must see the coalesced value, not the +# wrong NULL (ORDER BY is covered separately below) +########## + +# right-only row (k = 4) must be findable by its merged key +query III nosort +SELECT k, x, y FROM a FULL JOIN b USING (k) WHERE k = 4 +---- +4 NULL 400 + +########## +# Guards / reference: these are already correct and must stay correct +########## + +# qualified access to each side is independent of the merged key +query II nosort +SELECT a.k, b.k FROM a FULL JOIN b USING (k) ORDER BY coalesce(a.k, b.k) +---- +1 NULL +2 2 +3 3 +NULL 4 + +# the explicit form is the reference for what the merged key should equal +query I nosort +SELECT coalesce(a.k, b.k) AS k FROM a FULL JOIN b ON a.k = b.k ORDER BY k +---- +1 +2 +3 +4 + +########## +# Wildcard over the join: the merged key must appear once and carry the right +# value. (commit 2 target -- the materialized merged column must dedupe under *) +########## + +query III nosort +SELECT * FROM a RIGHT JOIN b USING (k) ORDER BY k NULLS LAST +---- +2 20 200 +3 30 300 +4 NULL 400 + +query III nosort +SELECT * FROM a FULL JOIN b USING (k) ORDER BY k NULLS LAST +---- +1 10 NULL +2 20 200 +3 30 300 +4 NULL 400 + +statement ok +drop table a + +statement ok +drop table b + +########## +# ORDER BY on the merged key: a right-only row with a small key must sort by +# its real value, not as a NULL +########## + +statement ok +create table c(k int, x int) as values (5, 50); + +statement ok +create table d(k int, y int) as values (0, 0), (5, 500); + +query II nosort +SELECT x, k FROM c RIGHT JOIN d USING (k) ORDER BY k +---- +NULL 0 +50 5 + +statement ok +drop table c + +statement ok +drop table d From 8872e24e69503632ccb4f37f14df24a4d0ba1ef6 Mon Sep 17 00:00:00 2001 From: Jiawei Zhao Date: Wed, 17 Jun 2026 22:40:29 +0800 Subject: [PATCH 02/10] fix: coalesce the merged key of RIGHT/FULL USING/NATURAL joins A USING / NATURAL join exposes its join key as a single merged column whose value, per the SQL standard, is COALESCE(left, right). DataFusion resolved an unqualified reference to that merged key to the left column unconditionally. For RIGHT and FULL joins the left key is NULL-padded on rows that exist only on the right, so the merged key came out NULL instead of the value that is actually present. That wrong NULL is silent and propagates into ORDER BY and other uses of the key, changing query results. Resolve the unqualified merged key of RIGHT / FULL joins to COALESCE(left, right) so it carries the value from whichever side is present, matching the explicit `coalesce(a.k, b.k) ... ON a.k = b.k` form. INNER / LEFT are unaffected, since their left key is never NULL-padded. Refs #22881 Signed-off-by: Jiawei Zhao --- datafusion/expr/src/expr_rewriter/mod.rs | 71 +++++++++++++++++++++++- datafusion/expr/src/logical_plan/plan.rs | 33 +++++++++++ datafusion/sql/src/select.rs | 9 ++- 3 files changed, 107 insertions(+), 6 deletions(-) diff --git a/datafusion/expr/src/expr_rewriter/mod.rs b/datafusion/expr/src/expr_rewriter/mod.rs index a9a0c156538f9..a18e003de9907 100644 --- a/datafusion/expr/src/expr_rewriter/mod.rs +++ b/datafusion/expr/src/expr_rewriter/mod.rs @@ -22,6 +22,7 @@ use std::collections::HashSet; use std::fmt::Debug; use std::sync::Arc; +use crate::conditional_expressions::CaseBuilder; use crate::expr::{Alias, Sort, Unnest}; use crate::logical_plan::Projection; use crate::{Expr, ExprSchemable, LogicalPlan, LogicalPlanBuilder}; @@ -66,12 +67,22 @@ pub trait FunctionRewrite: Debug { /// Recursively call `LogicalPlanBuilder::normalize` on all [`Column`] expressions /// in the `expr` expression tree. +/// +/// An unqualified reference to the merged key of a `RIGHT` / `FULL` +/// `USING` / `NATURAL` join resolves to `COALESCE(left, right)` (expressed as the +/// equivalent `CASE`), since the left key is NULL-padded on right-only rows. pub fn normalize_col(expr: Expr, plan: &LogicalPlan) -> Result { + let outer_using_keys = plan.outer_using_key_pairs()?; expr.transform(|expr| { Ok({ if let Expr::Column(c) = expr { + let was_unqualified = c.relation.is_none(); let col = LogicalPlanBuilder::normalize(plan, c)?; - Transformed::yes(Expr::Column(col)) + Transformed::yes(merged_using_key_or_column( + col, + was_unqualified, + &outer_using_keys, + )?) } else { Transformed::no(expr) } @@ -80,18 +91,67 @@ pub fn normalize_col(expr: Expr, plan: &LogicalPlan) -> Result { .data() } +/// Resolve a normalized column to itself, or -- when it is the merged key of a +/// `RIGHT` / `FULL` `USING` / `NATURAL` join referenced *unqualified* -- to the +/// equivalent `COALESCE(left, right)`. +/// +/// The COALESCE is built as `CASE WHEN key IS NOT NULL THEN key ELSE other END` +/// (the same form `coalesce` is simplified to, and buildable here without +/// depending on the functions crate), aliased to the key name so the output +/// column keeps its name. +fn merged_using_key_or_column( + col: Column, + was_unqualified: bool, + outer_using_keys: &[(Column, Column)], +) -> Result { + if was_unqualified + && let Some((l, r)) = outer_using_keys + .iter() + .find(|(l, r)| l == &col || r == &col) + { + let other = if l == &col { r } else { l }; + let case = CaseBuilder::new( + None, + vec![Expr::Column(col.clone()).is_not_null()], + vec![Expr::Column(col.clone())], + Some(Box::new(Expr::Column(other.clone()))), + ) + .end()?; + return Ok(case.alias(col.name.clone())); + } + Ok(Expr::Column(col)) +} + /// See [`Column::normalize_with_schemas_and_ambiguity_check`] for usage pub fn normalize_col_with_schemas_and_ambiguity_check( expr: Expr, schemas: &[&[&DFSchema]], using_columns: &[HashSet], +) -> Result { + normalize_col_with_schemas_ambiguity_and_outer_using( + expr, + schemas, + using_columns, + &[], + ) +} + +/// Like [`normalize_col_with_schemas_and_ambiguity_check`], but additionally +/// resolves the merged key of a `RIGHT` / `FULL` `USING` / `NATURAL` join (given +/// as `(left, right)` column pairs) to `COALESCE(left, right)`. +pub fn normalize_col_with_schemas_ambiguity_and_outer_using( + expr: Expr, + schemas: &[&[&DFSchema]], + using_columns: &[HashSet], + outer_using_keys: &[(Column, Column)], ) -> Result { // Normalize column inside Unnest if let Expr::Unnest(Unnest { expr }) = expr { - let e = normalize_col_with_schemas_and_ambiguity_check( + let e = normalize_col_with_schemas_ambiguity_and_outer_using( expr.as_ref().clone(), schemas, using_columns, + outer_using_keys, )?; return Ok(Expr::Unnest(Unnest { expr: Box::new(e) })); } @@ -99,9 +159,14 @@ pub fn normalize_col_with_schemas_and_ambiguity_check( expr.transform(|expr| { Ok({ if let Expr::Column(c) = expr { + let was_unqualified = c.relation.is_none(); let col = c.normalize_with_schemas_and_ambiguity_check(schemas, using_columns)?; - Transformed::yes(Expr::Column(col)) + Transformed::yes(merged_using_key_or_column( + col, + was_unqualified, + outer_using_keys, + )?) } else { Transformed::no(expr) } diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index c154bc7c92fa5..54b801d19d458 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -525,6 +525,39 @@ impl LogicalPlan { Ok(using_columns) } + /// Returns the `(left, right)` join-key column pairs of every `RIGHT` / + /// `FULL` `USING` / `NATURAL` join in this plan. + /// + /// For these join types the left key is NULL-padded on rows that exist only + /// on the right, so an unqualified reference to the merged key must resolve + /// to `COALESCE(left, right)` rather than to the left column alone. + pub fn outer_using_key_pairs( + &self, + ) -> Result, DataFusionError> { + let mut pairs = vec![]; + + self.apply_with_subqueries(|plan| { + if let LogicalPlan::Join(Join { + join_constraint: JoinConstraint::Using, + join_type: JoinType::Right | JoinType::Full, + on, + .. + }) = plan + { + for (l, r) in on { + if let (Some(l), Some(r)) = + (l.get_as_join_column(), r.get_as_join_column()) + { + pairs.push((l.to_owned(), r.to_owned())); + } + } + } + Ok(TreeNodeRecursion::Continue) + })?; + + Ok(pairs) + } + /// returns the first output expression of this `LogicalPlan` node. pub fn head_output_expr(&self) -> Result> { match self { diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index ba7353c424f4e..0400c37355925 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -38,7 +38,8 @@ use datafusion_expr::ExprSchemable; use datafusion_expr::builder::get_struct_unnested_columns; use datafusion_expr::expr::{PlannedReplaceSelectItem, WildcardOptions}; use datafusion_expr::expr_rewriter::{ - normalize_col, normalize_col_with_schemas_and_ambiguity_check, normalize_sorts, + normalize_col, normalize_col_with_schemas_ambiguity_and_outer_using, + normalize_col_with_schemas_and_ambiguity_check, normalize_sorts, }; use datafusion_expr::select_expr::SelectExpr; use datafusion_expr::utils::{ @@ -973,10 +974,11 @@ impl SqlToRel<'_, S> { match sql { SelectItem::UnnamedExpr(expr) => { let expr = self.sql_to_expr(expr, plan.schema(), planner_context)?; - let col = normalize_col_with_schemas_and_ambiguity_check( + let col = normalize_col_with_schemas_ambiguity_and_outer_using( expr, &[&[plan.schema()]], &plan.using_columns()?, + &plan.outer_using_key_pairs()?, )?; Ok(SelectExpr::Expression(col)) @@ -984,10 +986,11 @@ impl SqlToRel<'_, S> { SelectItem::ExprWithAlias { expr, alias } => { let select_expr = self.sql_to_expr(expr, plan.schema(), planner_context)?; - let col = normalize_col_with_schemas_and_ambiguity_check( + let col = normalize_col_with_schemas_ambiguity_and_outer_using( select_expr, &[&[plan.schema()]], &plan.using_columns()?, + &plan.outer_using_key_pairs()?, )?; let name = self.ident_normalizer.normalize(alias); // avoiding adding an alias if the column name is the same. From c204969d4af44dca248f1f5bef71afacd3d29304 Mon Sep 17 00:00:00 2001 From: Jiawei Zhao Date: Wed, 17 Jun 2026 22:52:41 +0800 Subject: [PATCH 03/10] fix: resolve the USING/NATURAL merged key in WHERE clauses A USING / NATURAL merged key can be referenced unqualified anywhere a column can, including in WHERE. But WHERE resolved unqualified references against only the columns the predicate itself mentions, not the join's USING columns, so an unqualified merged-key reference matched both the left and right key with no USING context and was rejected as ambiguous. Resolve WHERE predicates the same way as the SELECT list -- against the join's real USING columns -- so the merged key is recognized and, for RIGHT / FULL joins, takes the value from whichever side is present (COALESCE(left, right)) instead of erroring. Refs #22881 Signed-off-by: Jiawei Zhao --- datafusion/sql/src/select.rs | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index 0400c37355925..f2c404e5efb7d 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -38,12 +38,11 @@ use datafusion_expr::ExprSchemable; use datafusion_expr::builder::get_struct_unnested_columns; use datafusion_expr::expr::{PlannedReplaceSelectItem, WildcardOptions}; use datafusion_expr::expr_rewriter::{ - normalize_col, normalize_col_with_schemas_ambiguity_and_outer_using, - normalize_col_with_schemas_and_ambiguity_check, normalize_sorts, + normalize_col, normalize_col_with_schemas_ambiguity_and_outer_using, normalize_sorts, }; use datafusion_expr::select_expr::SelectExpr; use datafusion_expr::utils::{ - expr_as_column_expr, expr_to_columns, find_aggregate_exprs, find_window_exprs, + expr_as_column_expr, find_aggregate_exprs, find_window_exprs, }; use datafusion_expr::{ Aggregate, Expr, Filter, GroupingSet, LogicalPlan, LogicalPlanBuilder, @@ -880,22 +879,26 @@ impl SqlToRel<'_, S> { ); } - let mut using_columns = HashSet::new(); - expr_to_columns(&filter_expr, &mut using_columns)?; + // Resolve unqualified references against the real USING / NATURAL + // join columns so the merged key is recognized (and, for + // RIGHT / FULL, coalesced) rather than reported as ambiguous. + let using_columns = plan.using_columns()?; + let outer_using_keys = plan.outer_using_key_pairs()?; let mut schema_stack: Vec> = vec![vec![plan.schema()], fallback_schemas]; for sc in planner_context.outer_schemas_iter() { schema_stack.push(vec![sc.as_ref()]); } - let filter_expr = normalize_col_with_schemas_and_ambiguity_check( + let filter_expr = normalize_col_with_schemas_ambiguity_and_outer_using( filter_expr, schema_stack .iter() .map(|sc| sc.as_slice()) .collect::>() .as_slice(), - &[using_columns], + &using_columns, + &outer_using_keys, )?; Ok(LogicalPlan::Filter(Filter::try_new( From adaa73470226a7385500295c9c22ad913c1ce9b0 Mon Sep 17 00:00:00 2001 From: Jiawei Zhao Date: Wed, 17 Jun 2026 23:01:05 +0800 Subject: [PATCH 04/10] fix: coalesce the merged USING/NATURAL key in wildcard expansion A wildcard (`SELECT *`) over a USING / NATURAL join collapses the duplicated join key to a single merged-key column. That surviving column was the left key, which is NULL-padded on rows present only on the right of a RIGHT / FULL join, so `SELECT *` showed NULL for the merged key on those rows. Expand the merged key in a wildcard as COALESCE(left, right) for RIGHT / FULL joins so it shows the value from whichever side is present, consistent with how an explicit reference to the key resolves and with the SQL standard. Closes #22881 Signed-off-by: Jiawei Zhao --- datafusion/expr/src/expr_rewriter/mod.rs | 2 +- datafusion/expr/src/utils.rs | 19 +++++++++++++++++-- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/datafusion/expr/src/expr_rewriter/mod.rs b/datafusion/expr/src/expr_rewriter/mod.rs index a18e003de9907..881f5a43cf4f7 100644 --- a/datafusion/expr/src/expr_rewriter/mod.rs +++ b/datafusion/expr/src/expr_rewriter/mod.rs @@ -99,7 +99,7 @@ pub fn normalize_col(expr: Expr, plan: &LogicalPlan) -> Result { /// (the same form `coalesce` is simplified to, and buildable here without /// depending on the functions crate), aliased to the key name so the output /// column keeps its name. -fn merged_using_key_or_column( +pub(crate) fn merged_using_key_or_column( col: Column, was_unqualified: bool, outer_using_keys: &[(Column, Column)], diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 22abb454d4e6b..1c2cf2b7e2261 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -22,7 +22,7 @@ use std::collections::{BTreeSet, HashSet}; use std::sync::Arc; use crate::expr::{Alias, Sort, WildcardOptions, WindowFunctionParams}; -use crate::expr_rewriter::strip_outer_reference; +use crate::expr_rewriter::{merged_using_key_or_column, strip_outer_reference}; use crate::{ BinaryExpr, Expr, ExprSchemable, Filter, GroupingSet, LogicalPlan, Operator, and, }; @@ -461,7 +461,22 @@ pub fn expand_wildcard( }; // Add each excluded `Column` to columns_to_skip columns_to_skip.extend(excluded_columns); - Ok(get_exprs_except_skipped(schema, &columns_to_skip)) + let exprs = get_exprs_except_skipped(schema, &columns_to_skip); + + // For RIGHT / FULL USING / NATURAL joins the surviving key column is the + // merged key; expose it as COALESCE(left, right) so a wildcard shows the + // value from whichever side is present rather than the NULL-padded side. + let outer_using_keys = plan.outer_using_key_pairs()?; + if outer_using_keys.is_empty() { + return Ok(exprs); + } + exprs + .into_iter() + .map(|expr| match expr { + Expr::Column(col) => merged_using_key_or_column(col, true, &outer_using_keys), + other => Ok(other), + }) + .collect() } /// Resolves an `Expr::Wildcard` to a collection of qualified `Expr::Column`'s. From 4bd4325539cf5a8fee07e906ee2e318551f005cc Mon Sep 17 00:00:00 2001 From: Jiawei Zhao Date: Thu, 18 Jun 2026 10:05:50 +0800 Subject: [PATCH 05/10] refactor: resolve USING/NATURAL merged key by join type Resolve an unqualified reference to the merged key of a USING / NATURAL join to the side that is never NULL-padded, addressing review feedback that always coalescing handled LEFT and RIGHT asymmetrically: the right key for RIGHT, COALESCE(left, right) for FULL, and the left key (unchanged) for INNER / LEFT. For RIGHT this yields a plain column instead of a computed COALESCE, so the merged key no longer collides with a qualified key in the same projection schema -- queries like `ORDER BY a.k` or `SELECT k, a.k` over a RIGHT join now plan cleanly. FULL still needs the computed COALESCE and keeps that limitation. Also drop the public normalize_col_with_schemas_ambiguity_and_outer_using and resolve the merged key in the SQL planner instead, so the USING-outer-join detail no longer leaks into the generic expr public API. Rename outer_using_key_pairs to right_or_full_using_key_pairs (it never covered LEFT) and carry the join type. Correct two stale integration snapshots to the right-side merged key. Refs #22881 Signed-off-by: Jiawei Zhao --- datafusion/expr/src/expr_rewriter/mod.rs | 91 +++++++++++------------- datafusion/expr/src/logical_plan/plan.rs | 26 ++++--- datafusion/expr/src/utils.rs | 11 +-- datafusion/sql/src/select.rs | 70 ++++++++++++++---- datafusion/sql/tests/sql_integration.rs | 5 +- 5 files changed, 122 insertions(+), 81 deletions(-) diff --git a/datafusion/expr/src/expr_rewriter/mod.rs b/datafusion/expr/src/expr_rewriter/mod.rs index 881f5a43cf4f7..d05d280ba3155 100644 --- a/datafusion/expr/src/expr_rewriter/mod.rs +++ b/datafusion/expr/src/expr_rewriter/mod.rs @@ -30,7 +30,7 @@ use crate::{Expr, ExprSchemable, LogicalPlan, LogicalPlanBuilder}; use datafusion_common::TableReference; use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; -use datafusion_common::{Column, DFSchema, Result}; +use datafusion_common::{Column, DFSchema, JoinType, Result}; mod guarantees; pub use guarantees::GuaranteeRewriter; @@ -69,10 +69,11 @@ pub trait FunctionRewrite: Debug { /// in the `expr` expression tree. /// /// An unqualified reference to the merged key of a `RIGHT` / `FULL` -/// `USING` / `NATURAL` join resolves to `COALESCE(left, right)` (expressed as the -/// equivalent `CASE`), since the left key is NULL-padded on right-only rows. +/// `USING` / `NATURAL` join is resolved to the never-NULL-padded side -- the +/// right key for `RIGHT`, `COALESCE(left, right)` for `FULL` -- via +/// [`merged_using_key_or_column`]. pub fn normalize_col(expr: Expr, plan: &LogicalPlan) -> Result { - let outer_using_keys = plan.outer_using_key_pairs()?; + let merged_keys = plan.right_or_full_using_key_pairs()?; expr.transform(|expr| { Ok({ if let Expr::Column(c) = expr { @@ -81,7 +82,7 @@ pub fn normalize_col(expr: Expr, plan: &LogicalPlan) -> Result { Transformed::yes(merged_using_key_or_column( col, was_unqualified, - &outer_using_keys, + &merged_keys, )?) } else { Transformed::no(expr) @@ -93,31 +94,42 @@ pub fn normalize_col(expr: Expr, plan: &LogicalPlan) -> Result { /// Resolve a normalized column to itself, or -- when it is the merged key of a /// `RIGHT` / `FULL` `USING` / `NATURAL` join referenced *unqualified* -- to the -/// equivalent `COALESCE(left, right)`. +/// value of the side that is never NULL-padded. /// -/// The COALESCE is built as `CASE WHEN key IS NOT NULL THEN key ELSE other END` -/// (the same form `coalesce` is simplified to, and buildable here without -/// depending on the functions crate), aliased to the key name so the output -/// column keeps its name. -pub(crate) fn merged_using_key_or_column( +/// `merged_keys` carries the `(left, right, join_type)` triples of the plan's +/// `RIGHT` / `FULL` USING / NATURAL joins. The merged key resolves to the right +/// key for `RIGHT`, and to `COALESCE(left, right)` for `FULL`. The COALESCE is +/// built as `CASE WHEN left IS NOT NULL THEN left ELSE right END` (the form +/// `coalesce` is simplified to, buildable here without depending on the +/// functions crate), aliased to the key name so the output column keeps its +/// name. `LEFT` / `INNER` never reach here -- their merged key is the left +/// column the normalization already produced. +pub fn merged_using_key_or_column( col: Column, was_unqualified: bool, - outer_using_keys: &[(Column, Column)], + merged_keys: &[(Column, Column, JoinType)], ) -> Result { if was_unqualified - && let Some((l, r)) = outer_using_keys - .iter() - .find(|(l, r)| l == &col || r == &col) + && let Some((l, r, join_type)) = + merged_keys.iter().find(|(l, r, _)| l == &col || r == &col) { - let other = if l == &col { r } else { l }; - let case = CaseBuilder::new( - None, - vec![Expr::Column(col.clone()).is_not_null()], - vec![Expr::Column(col.clone())], - Some(Box::new(Expr::Column(other.clone()))), - ) - .end()?; - return Ok(case.alias(col.name.clone())); + match join_type { + // The left key is NULL-padded on right-only rows; the right key is + // always present, so the merged key is just the right column. + JoinType::Right => return Ok(Expr::Column(r.clone())), + // Either side may be NULL-padded; coalesce to the present one. + JoinType::Full => { + let case = CaseBuilder::new( + None, + vec![Expr::Column(l.clone()).is_not_null()], + vec![Expr::Column(l.clone())], + Some(Box::new(Expr::Column(r.clone()))), + ) + .end()?; + return Ok(case.alias(col.name.clone())); + } + _ => {} + } } Ok(Expr::Column(col)) } @@ -127,31 +139,13 @@ pub fn normalize_col_with_schemas_and_ambiguity_check( expr: Expr, schemas: &[&[&DFSchema]], using_columns: &[HashSet], -) -> Result { - normalize_col_with_schemas_ambiguity_and_outer_using( - expr, - schemas, - using_columns, - &[], - ) -} - -/// Like [`normalize_col_with_schemas_and_ambiguity_check`], but additionally -/// resolves the merged key of a `RIGHT` / `FULL` `USING` / `NATURAL` join (given -/// as `(left, right)` column pairs) to `COALESCE(left, right)`. -pub fn normalize_col_with_schemas_ambiguity_and_outer_using( - expr: Expr, - schemas: &[&[&DFSchema]], - using_columns: &[HashSet], - outer_using_keys: &[(Column, Column)], ) -> Result { // Normalize column inside Unnest if let Expr::Unnest(Unnest { expr }) = expr { - let e = normalize_col_with_schemas_ambiguity_and_outer_using( + let e = normalize_col_with_schemas_and_ambiguity_check( expr.as_ref().clone(), schemas, using_columns, - outer_using_keys, )?; return Ok(Expr::Unnest(Unnest { expr: Box::new(e) })); } @@ -159,14 +153,9 @@ pub fn normalize_col_with_schemas_ambiguity_and_outer_using( expr.transform(|expr| { Ok({ if let Expr::Column(c) = expr { - let was_unqualified = c.relation.is_none(); - let col = - c.normalize_with_schemas_and_ambiguity_check(schemas, using_columns)?; - Transformed::yes(merged_using_key_or_column( - col, - was_unqualified, - outer_using_keys, - )?) + Transformed::yes(Expr::Column( + c.normalize_with_schemas_and_ambiguity_check(schemas, using_columns)?, + )) } else { Transformed::no(expr) } diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 54b801d19d458..017ebad29942c 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -525,21 +525,25 @@ impl LogicalPlan { Ok(using_columns) } - /// Returns the `(left, right)` join-key column pairs of every `RIGHT` / - /// `FULL` `USING` / `NATURAL` join in this plan. + /// Returns the `(left, right, join_type)` join-key column triples of every + /// `RIGHT` / `FULL` `USING` / `NATURAL` join in this plan. /// - /// For these join types the left key is NULL-padded on rows that exist only - /// on the right, so an unqualified reference to the merged key must resolve - /// to `COALESCE(left, right)` rather than to the left column alone. - pub fn outer_using_key_pairs( + /// For these join types the merged key is not simply the left column: the + /// left key is NULL-padded on right-only rows. An unqualified reference to + /// the merged key must therefore resolve to the side that is never + /// NULL-padded -- the right key for `RIGHT`, or `COALESCE(left, right)` for + /// `FULL` (where either side may be padded). `LEFT` / `INNER` are omitted: + /// their merged key is the left column, which the normal resolution already + /// picks. + pub fn right_or_full_using_key_pairs( &self, - ) -> Result, DataFusionError> { - let mut pairs = vec![]; + ) -> Result, DataFusionError> { + let mut triples = vec![]; self.apply_with_subqueries(|plan| { if let LogicalPlan::Join(Join { join_constraint: JoinConstraint::Using, - join_type: JoinType::Right | JoinType::Full, + join_type: join_type @ (JoinType::Right | JoinType::Full), on, .. }) = plan @@ -548,14 +552,14 @@ impl LogicalPlan { if let (Some(l), Some(r)) = (l.get_as_join_column(), r.get_as_join_column()) { - pairs.push((l.to_owned(), r.to_owned())); + triples.push((l.to_owned(), r.to_owned(), *join_type)); } } } Ok(TreeNodeRecursion::Continue) })?; - Ok(pairs) + Ok(triples) } /// returns the first output expression of this `LogicalPlan` node. diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 1c2cf2b7e2261..da1428de1567c 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -464,16 +464,17 @@ pub fn expand_wildcard( let exprs = get_exprs_except_skipped(schema, &columns_to_skip); // For RIGHT / FULL USING / NATURAL joins the surviving key column is the - // merged key; expose it as COALESCE(left, right) so a wildcard shows the - // value from whichever side is present rather than the NULL-padded side. - let outer_using_keys = plan.outer_using_key_pairs()?; - if outer_using_keys.is_empty() { + // merged key; resolve it to the never-NULL-padded side (the right key for + // RIGHT, COALESCE(left, right) for FULL) so a wildcard shows the value from + // whichever side is present rather than the NULL-padded one. + let merged_keys = plan.right_or_full_using_key_pairs()?; + if merged_keys.is_empty() { return Ok(exprs); } exprs .into_iter() .map(|expr| match expr { - Expr::Column(col) => merged_using_key_or_column(col, true, &outer_using_keys), + Expr::Column(col) => merged_using_key_or_column(col, true, &merged_keys), other => Ok(other), }) .collect() diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index f2c404e5efb7d..12c9a5f291d1e 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -31,14 +31,18 @@ use crate::utils::{ use arrow::datatypes::DataType; use datafusion_common::error::DataFusionErrorBuilder; -use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; -use datafusion_common::{Column, DFSchema, DFSchemaRef, Result, not_impl_err, plan_err}; +use datafusion_common::tree_node::{ + Transformed, TransformedResult, TreeNode, TreeNodeRecursion, +}; +use datafusion_common::{ + Column, DFSchema, DFSchemaRef, JoinType, Result, not_impl_err, plan_err, +}; use datafusion_common::{RecursionUnnestOption, UnnestOptions}; use datafusion_expr::ExprSchemable; use datafusion_expr::builder::get_struct_unnested_columns; -use datafusion_expr::expr::{PlannedReplaceSelectItem, WildcardOptions}; +use datafusion_expr::expr::{PlannedReplaceSelectItem, Unnest, WildcardOptions}; use datafusion_expr::expr_rewriter::{ - normalize_col, normalize_col_with_schemas_ambiguity_and_outer_using, normalize_sorts, + merged_using_key_or_column, normalize_col, normalize_sorts, }; use datafusion_expr::select_expr::SelectExpr; use datafusion_expr::utils::{ @@ -90,6 +94,47 @@ fn flatten_expr_groups(expr_groups: Vec>) -> Vec { expr_groups.into_iter().flatten().collect() } +/// Normalize unqualified column references in `expr` against `schemas`, then +/// resolve the merged key of any RIGHT / FULL USING / NATURAL join to the side +/// that is never NULL-padded (see [`merged_using_key_or_column`]). +/// +/// This is a single pass on purpose: only references written *unqualified* are +/// merged keys, and that distinction is lost once normalization has qualified +/// them, so the merged-key resolution has to happen as each column is resolved. +fn normalize_col_resolving_merged_using_key( + expr: Expr, + schemas: &[&[&DFSchema]], + using_columns: &[HashSet], + merged_keys: &[(Column, Column, JoinType)], +) -> Result { + // Normalize column inside Unnest + if let Expr::Unnest(Unnest { expr }) = expr { + let e = normalize_col_resolving_merged_using_key( + expr.as_ref().clone(), + schemas, + using_columns, + merged_keys, + )?; + return Ok(Expr::Unnest(Unnest { expr: Box::new(e) })); + } + + expr.transform(|expr| { + Ok(if let Expr::Column(c) = expr { + let was_unqualified = c.relation.is_none(); + let col = + c.normalize_with_schemas_and_ambiguity_check(schemas, using_columns)?; + Transformed::yes(merged_using_key_or_column( + col, + was_unqualified, + merged_keys, + )?) + } else { + Transformed::no(expr) + }) + }) + .data() +} + impl SqlToRel<'_, S> { /// Generate a logic plan from an SQL select pub(super) fn select_to_plan( @@ -881,16 +926,17 @@ impl SqlToRel<'_, S> { // Resolve unqualified references against the real USING / NATURAL // join columns so the merged key is recognized (and, for - // RIGHT / FULL, coalesced) rather than reported as ambiguous. + // RIGHT / FULL, resolved to the preserved side) rather than + // reported as ambiguous. let using_columns = plan.using_columns()?; - let outer_using_keys = plan.outer_using_key_pairs()?; + let merged_keys = plan.right_or_full_using_key_pairs()?; let mut schema_stack: Vec> = vec![vec![plan.schema()], fallback_schemas]; for sc in planner_context.outer_schemas_iter() { schema_stack.push(vec![sc.as_ref()]); } - let filter_expr = normalize_col_with_schemas_ambiguity_and_outer_using( + let filter_expr = normalize_col_resolving_merged_using_key( filter_expr, schema_stack .iter() @@ -898,7 +944,7 @@ impl SqlToRel<'_, S> { .collect::>() .as_slice(), &using_columns, - &outer_using_keys, + &merged_keys, )?; Ok(LogicalPlan::Filter(Filter::try_new( @@ -977,11 +1023,11 @@ impl SqlToRel<'_, S> { match sql { SelectItem::UnnamedExpr(expr) => { let expr = self.sql_to_expr(expr, plan.schema(), planner_context)?; - let col = normalize_col_with_schemas_ambiguity_and_outer_using( + let col = normalize_col_resolving_merged_using_key( expr, &[&[plan.schema()]], &plan.using_columns()?, - &plan.outer_using_key_pairs()?, + &plan.right_or_full_using_key_pairs()?, )?; Ok(SelectExpr::Expression(col)) @@ -989,11 +1035,11 @@ impl SqlToRel<'_, S> { SelectItem::ExprWithAlias { expr, alias } => { let select_expr = self.sql_to_expr(expr, plan.schema(), planner_context)?; - let col = normalize_col_with_schemas_ambiguity_and_outer_using( + let col = normalize_col_resolving_merged_using_key( select_expr, &[&[plan.schema()]], &plan.using_columns()?, - &plan.outer_using_key_pairs()?, + &plan.right_or_full_using_key_pairs()?, )?; let name = self.ident_normalizer.normalize(alias); // avoiding adding an alias if the column name is the same. diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index 88b7b43eb73f6..d63f0556f7a8d 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -1108,7 +1108,7 @@ fn natural_right_join() { assert_snapshot!( plan, @r" - Projection: a.l_item_id + Projection: b.l_item_id Right Join: Using a.l_orderkey = b.l_orderkey, a.l_item_id = b.l_item_id, a.l_description = b.l_description, a.l_extendedprice = b.l_extendedprice, a.price = b.price SubqueryAlias: a TableScan: lineitem @@ -5201,7 +5201,8 @@ fn test_using_join_wildcard_schema() { [ "t1.a".to_string(), "t1.b".to_string(), - "t2.c".to_string(), + // RIGHT join: the merged key `c` is the never-NULL-padded right side + "t3.c".to_string(), "t3.d".to_string() ] ); From 1ef52916b758c9680a13be0948c7d46d3bc16561 Mon Sep 17 00:00:00 2001 From: Jiawei Zhao Date: Mon, 22 Jun 2026 23:14:47 +0800 Subject: [PATCH 06/10] fix: order by a qualified key alongside a FULL USING/NATURAL merged key A FULL USING / NATURAL join exposes its merged key as an unqualified COALESCE(left, right) column. Ordering by a qualified key of the same join (`ORDER BY a.k`) folds that qualified `a.k` into the projection that already carries the unqualified merged `k`, building an illegal `{ k, a.k }` schema, so such queries errored at plan time. During sort push-down, rename a colliding merged key to a reserved internal name before folding the qualified sort column, and restore the original name in the wrapper projection. References to the merged key in the sort itself (`ORDER BY a.k, k`) are rewritten to the same internal name so multi-key sorts keep resolving; that shape was legal before the merged-key work, so this avoids a regression. Teach the unparser to flatten a wrapper projection that purely renames an inner column over a Sort, so the rewritten plan still round-trips through plan -> SQL -> plan. Refs #22881 Signed-off-by: Jiawei Zhao --- datafusion/expr/src/logical_plan/builder.rs | 157 +++++++++++++++++- datafusion/sql/src/unparser/rewrite.rs | 29 +++- datafusion/sql/tests/cases/plan_to_sql.rs | 43 +++++ .../test_files/join_using_merged_key.slt | 42 +++++ 4 files changed, 263 insertions(+), 8 deletions(-) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 2ecb12c30afad..be0c3f8b3fa2b 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -53,6 +53,7 @@ use arrow::datatypes::{DataType, Field, FieldRef, Fields, Schema, SchemaRef}; use datafusion_common::display::ToStringifiedPlan; use datafusion_common::file_options::file_type::FileType; use datafusion_common::metadata::FieldMetadata; +use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::{ Column, Constraints, DFSchema, DFSchemaRef, NullEquality, Result, ScalarValue, TableReference, ToDFSchema, UnnestOptions, exec_err, @@ -130,6 +131,11 @@ pub struct LogicalPlanBuilder { } impl LogicalPlanBuilder { + /// Reserved name prefix for a FULL USING / NATURAL join merged key that has + /// been renamed to dodge an unqualified/qualified schema collision during + /// sort push-down. A real column carrying this prefix would clash. + const MERGED_KEY_NAME_PREFIX: &str = "__datafusion_merged_key_"; + /// Create a builder from an existing plan pub fn new(plan: LogicalPlan) -> Self { Self { @@ -819,6 +825,97 @@ impl LogicalPlanBuilder { self.sort_with_limit(sorts, None) } + /// Internal name for a FULL USING / NATURAL join merged key that has been + /// renamed to dodge an unqualified/qualified schema collision during sort + /// push-down. See [`Self::sort_with_limit`]. + fn synthetic_merged_name(name: &str) -> String { + format!("{}{name}", Self::MERGED_KEY_NAME_PREFIX) + } + + /// Unqualified projection output fields that collide with a *qualified* + /// missing sort column of the same name. In practice these are the + /// `COALESCE` merged keys of FULL USING / NATURAL joins, exposed + /// unqualified, which `ORDER BY ` would otherwise drag into + /// an illegal `{ k, a.k }` projection schema. They are renamed to a + /// synthetic name before folding; any unqualified reference to them in the + /// sort itself is rewritten to match (see [`Self::rewrite_sort_merged_keys`]). + fn renamable_merged_keys( + schema: &DFSchemaRef, + missing_cols: &IndexSet, + ) -> HashSet { + let qualified_missing: HashSet<&str> = missing_cols + .iter() + .filter(|c| c.relation.is_some()) + .map(|c| c.name.as_str()) + .collect(); + if qualified_missing.is_empty() { + return HashSet::new(); + } + + schema + .columns() + .into_iter() + .filter(|c| { + c.relation.is_none() && qualified_missing.contains(c.name.as_str()) + }) + .map(|c| c.name) + .collect() + } + + /// Re-alias a renamable merged key output field to its + /// [`Self::synthetic_merged_name`] so it no longer collides with the + /// qualified sort column folded alongside it. + fn rename_merged_key(expr: Expr, renamable: &HashSet) -> Expr { + match expr { + Expr::Alias(mut alias) + if alias.relation.is_none() && renamable.contains(&alias.name) => + { + alias.name = Self::synthetic_merged_name(&alias.name); + Expr::Alias(alias) + } + Expr::Column(column) + if column.relation.is_none() && renamable.contains(&column.name) => + { + let name = Self::synthetic_merged_name(&column.name); + Expr::Column(column).alias(name) + } + other => other, + } + } + + /// Rewrite unqualified references to a renamed merged key inside the sort + /// expressions to its [`Self::synthetic_merged_name`], so a sort that also + /// orders by the merged key (e.g. `ORDER BY a.k, k`) keeps resolving after + /// the projection field has been renamed. + fn rewrite_sort_merged_keys( + sorts: Vec, + renamable: &HashSet, + ) -> Result> { + sorts + .into_iter() + .map(|sort| { + let expr = sort + .expr + .clone() + .transform(|e| { + Ok(match e { + Expr::Column(c) + if c.relation.is_none() + && renamable.contains(&c.name) => + { + Transformed::yes(Expr::Column(Column::new_unqualified( + Self::synthetic_merged_name(&c.name), + ))) + } + other => Transformed::no(other), + }) + }) + .data()?; + Ok(sort.with_expr(expr)) + }) + .collect() + } + /// Apply a sort pub fn sort_with_limit( self, @@ -852,15 +949,63 @@ impl LogicalPlanBuilder { }))); } + // A qualified sort column folded into the projection can collide with an + // unqualified output field of the same name: the COALESCE merged key of a + // FULL USING / NATURAL join is exposed unqualified, so `ORDER BY a.k` + // dragging the qualified `a.k` into the `SELECT k` projection would build + // an illegal `{ k, a.k }` schema. Rename such merged keys to a unique + // internal name before folding, and restore the original name below in + // the wrapper projection. + let renamable = match self.plan.as_ref() { + LogicalPlan::Projection(_) => { + Self::renamable_merged_keys(schema, &missing_cols) + } + _ => HashSet::new(), + }; + + // Keep the sort's own references to a renamed merged key resolving by + // rewriting them to the same synthetic name (e.g. `ORDER BY a.k, k`, + // where `k` is the merged key being renamed to dodge the `a.k` clash). + let sorts = if renamable.is_empty() { + sorts + } else { + Self::rewrite_sort_merged_keys(sorts, &renamable)? + }; + // remove pushed down sort columns - let new_expr = schema.columns().into_iter().map(Expr::Column).collect(); + let new_expr = schema + .columns() + .into_iter() + .map(|c| { + if c.relation.is_none() && renamable.contains(&c.name) { + Expr::Column(Column::new_unqualified(Self::synthetic_merged_name( + &c.name, + ))) + .alias(c.name) + } else { + Expr::Column(c) + } + }) + .collect(); + + let input = if renamable.is_empty() { + Arc::unwrap_or_clone(self.plan) + } else { + match Arc::unwrap_or_clone(self.plan) { + LogicalPlan::Projection(Projection { expr, input, .. }) => { + let expr = expr + .into_iter() + .map(|e| Self::rename_merged_key(e, &renamable)) + .collect::>(); + LogicalPlan::Projection(Projection::try_new(expr, input)?) + } + // `renamable` is only non-empty for a Projection (checked above). + other => other, + } + }; let is_distinct = false; - let plan = Self::add_missing_columns( - Arc::unwrap_or_clone(self.plan), - &missing_cols, - is_distinct, - )?; + let plan = Self::add_missing_columns(input, &missing_cols, is_distinct)?; let sort_plan = LogicalPlan::Sort(Sort { expr: normalize_sorts(sorts, &plan)?, diff --git a/datafusion/sql/src/unparser/rewrite.rs b/datafusion/sql/src/unparser/rewrite.rs index 6ee66f61938f0..c4a23618a896d 100644 --- a/datafusion/sql/src/unparser/rewrite.rs +++ b/datafusion/sql/src/unparser/rewrite.rs @@ -220,7 +220,20 @@ pub(super) fn rewrite_plan_for_sort_on_non_projected_fields( }) .collect::>(); - let mut collects = p.expr.clone(); + let mut collects = p + .expr + .iter() + .map(|e| match e { + // A pure rename `Column(x) AS out` compares as its underlying + // column `x`, so an outer Projection that only renames an inner + // column (e.g. the restored merged key of a FULL USING / NATURAL + // join) still matches and can be flattened instead of nested. + Expr::Alias(alias) if matches!(alias.expr.as_ref(), Expr::Column(_)) => { + alias.expr.as_ref().clone() + } + _ => e.clone(), + }) + .collect::>(); for sort in &sort.expr { // Strip aliases from sort expressions so the comparison matches // the inner Projection's raw expressions. The optimizer may add @@ -250,7 +263,19 @@ pub(super) fn rewrite_plan_for_sort_on_non_projected_fields( let new_exprs = p .expr .iter() - .map(|e| map.get(e).unwrap_or(e).clone()) + .map(|e| { + // For a pure rename `Column(x) AS out`, inline the inner + // definition of `x` and re-apply the outer output name, so the + // flattened SELECT keeps `out` as the column name. + if let Expr::Alias(alias) = e + && matches!(alias.expr.as_ref(), Expr::Column(_)) + && let Some(inner) = map.get(alias.expr.as_ref()) + { + inner.clone().unalias().alias(alias.name.clone()) + } else { + map.get(e).unwrap_or(e).clone() + } + }) .collect::>(); // The inner Projection may define aliases that the Sort references diff --git a/datafusion/sql/tests/cases/plan_to_sql.rs b/datafusion/sql/tests/cases/plan_to_sql.rs index 937ed9894fdfa..01cde263adba0 100644 --- a/datafusion/sql/tests/cases/plan_to_sql.rs +++ b/datafusion/sql/tests/cases/plan_to_sql.rs @@ -46,6 +46,7 @@ use datafusion_sql::unparser::dialect::{ }; use datafusion_sql::unparser::{Unparser, expr_to_sql, plan_to_sql}; use insta::assert_snapshot; +use rstest::rstest; use sqlparser::ast::Statement; use std::hash::Hash; use std::ops::Add; @@ -260,6 +261,48 @@ fn roundtrip_statement() -> Result<()> { Ok(()) } +/// Regression for https://github.com/apache/datafusion/issues/22881. +/// +/// A FULL JOIN USING / NATURAL merged key is `COALESCE(left, right)`, exposed +/// as an *unqualified* column. Selecting that merged key while ordering by a +/// *qualified* key of the same join (`ORDER BY ta.id`) must both plan and +/// survive an unparser round-trip (`plan -> SQL -> plan`). Earlier fixes either +/// errored at plan time or produced SQL that could not be re-planned. `USING` +/// and `NATURAL` are covered separately since the unparser renders them +/// differently. +#[rstest] +#[case::full_join_using( + "SELECT id FROM (SELECT j1_id AS id FROM j1) ta \ + FULL JOIN (SELECT j2_id AS id FROM j2) tb USING (id) \ + ORDER BY ta.id" +)] +#[case::natural_full_join( + "SELECT id FROM (SELECT j1_id AS id FROM j1) ta \ + NATURAL FULL JOIN (SELECT j2_id AS id FROM j2) tb \ + ORDER BY ta.id" +)] +fn roundtrip_full_join_merged_key_order_by_qualified(#[case] query: &str) -> Result<()> { + let dialect = GenericDialect {}; + let statement = Parser::new(&dialect) + .try_with_sql(query)? + .parse_statement()?; + + let context = MockContextProvider { + state: MockSessionState::default(), + }; + let sql_to_rel = SqlToRel::new(&context); + let plan = sql_to_rel.sql_statement_to_plan(statement).unwrap(); + + let roundtrip_statement = plan_to_sql(&plan)?; + let plan_roundtrip = sql_to_rel + .sql_statement_to_plan(roundtrip_statement) + .unwrap(); + + assert_eq!(plan, plan_roundtrip); + + Ok(()) +} + #[test] fn roundtrip_crossjoin() -> Result<()> { let query = "select j1.j1_id, j2.j2_string from j1, j2"; diff --git a/datafusion/sqllogictest/test_files/join_using_merged_key.slt b/datafusion/sqllogictest/test_files/join_using_merged_key.slt index 2ad1066414dae..d7c951438b246 100644 --- a/datafusion/sqllogictest/test_files/join_using_merged_key.slt +++ b/datafusion/sqllogictest/test_files/join_using_merged_key.slt @@ -119,6 +119,48 @@ SELECT coalesce(a.k, b.k) AS k FROM a FULL JOIN b ON a.k = b.k ORDER BY k 3 4 +########## +# Regression: ORDER BY a *qualified* key while selecting the FULL merged key. +# The merged k is COALESCE(a.k, b.k), exposed unqualified; ORDER BY a.k folds +# the qualified a.k into the same projection. This must plan (it regressed to a +# schema-ambiguity error) and sort by a.k, not by the merged key: the right-only +# row (a.k IS NULL) sorts last, the rest descend by a.k -- so the output differs +# from ORDER BY k DESC (which would be 4, 3, 2, 1). +########## + +query I nosort +SELECT k FROM a FULL JOIN b USING (k) ORDER BY a.k DESC NULLS LAST +---- +3 +2 +1 +4 + +query I nosort +SELECT k FROM a NATURAL FULL JOIN b ORDER BY a.k DESC NULLS LAST +---- +3 +2 +1 +4 + +########## +# Regression: a *qualified* key and the *unqualified* merged key together in one +# ORDER BY (`ORDER BY a.k DESC NULLS LAST, k`). On main this was legal (the merged +# key resolved to a.k, so both sort keys were a.k); the merged-key fix must keep +# it legal -- renaming the merged key to dodge the `{k, a.k}` collision must also +# rewrite the unqualified `k` reference in the sort. a.k is unique here so the +# secondary `k` never breaks a tie, but the shape still exercises that path. +########## + +query I nosort +SELECT k FROM a FULL JOIN b USING (k) ORDER BY a.k DESC NULLS LAST, k +---- +3 +2 +1 +4 + ########## # Wildcard over the join: the merged key must appear once and carry the right # value. (commit 2 target -- the materialized merged column must dedupe under *) From 9de0ff4c1c78a730a9cdfa67c4aec8837764a132 Mon Sep 17 00:00:00 2001 From: Jiawei Zhao Date: Mon, 29 Jun 2026 11:41:50 +0800 Subject: [PATCH 07/10] fix: select merged USING key alongside qualified side keys A USING / NATURAL join exposes one merged key for a shared join column, ut the original side keys remain addressable through qualified references. Selecting the merged key next to a qualified side key, such as `SELECT k, a.k FROM a LEFT JOIN b USING (k)`, previously built a projection whose two output expressions resolved to the same qualified name and failed DataFusion's unique-field check. Detect direct unaliased projection items that select both the merged key and a qualified side key of the same USING / NATURAL join. Internally qualify only the merged-key output with the reserved merged-key qualifier so the projection schema remains distinct while preserving the user-visible column name. Teach the unparser to render that reserved merged-key qualifier back as the original unqualified key, keeping plan -> SQL -> plan round trips valid. Signed-off-by: Jiawei Zhao --- datafusion/expr/src/logical_plan/builder.rs | 7 +- datafusion/expr/src/logical_plan/plan.rs | 26 +++++ datafusion/sql/src/select.rs | 105 +++++++++++++++++- datafusion/sql/src/unparser/plan.rs | 13 +++ datafusion/sql/tests/cases/plan_to_sql.rs | 37 ++++++ .../test_files/join_using_merged_key.slt | 23 ++++ 6 files changed, 204 insertions(+), 7 deletions(-) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index be0c3f8b3fa2b..6e4a0062bb8fb 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -131,10 +131,9 @@ pub struct LogicalPlanBuilder { } impl LogicalPlanBuilder { - /// Reserved name prefix for a FULL USING / NATURAL join merged key that has - /// been renamed to dodge an unqualified/qualified schema collision during - /// sort push-down. A real column carrying this prefix would clash. - const MERGED_KEY_NAME_PREFIX: &str = "__datafusion_merged_key_"; + /// Reserved prefix for an internal FULL USING / NATURAL join merged key name + /// or qualifier. A real column or relation carrying this prefix would clash. + pub const MERGED_KEY_NAME_PREFIX: &str = "__datafusion_merged_key_"; /// Create a builder from an existing plan pub fn new(plan: LogicalPlan) -> Self { diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 017ebad29942c..c0520fa5f3e10 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -525,6 +525,32 @@ impl LogicalPlan { Ok(using_columns) } + /// Returns the `(left, right)` join-key column pairs of every `USING` / + /// `NATURAL` join in this plan. + pub fn using_key_pairs(&self) -> Result, DataFusionError> { + let mut pairs = vec![]; + + self.apply_with_subqueries(|plan| { + if let LogicalPlan::Join(Join { + join_constraint: JoinConstraint::Using, + on, + .. + }) = plan + { + for (l, r) in on { + if let (Some(l), Some(r)) = + (l.get_as_join_column(), r.get_as_join_column()) + { + pairs.push((l.to_owned(), r.to_owned())); + } + } + } + Ok(TreeNodeRecursion::Continue) + })?; + + Ok(pairs) + } + /// Returns the `(left, right, join_type)` join-key column triples of every /// `RIGHT` / `FULL` `USING` / `NATURAL` join in this plan. /// diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index 12c9a5f291d1e..d2dfc3f819010 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -19,7 +19,9 @@ use std::collections::HashSet; use std::ops::ControlFlow; use std::sync::Arc; -use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; +use crate::planner::{ + ContextProvider, PlannerContext, SqlToRel, idents_to_table_reference, +}; use crate::query::to_order_by_exprs_with_select; use crate::utils::{ CheckColumnsMustReferenceAggregatePurpose, CheckColumnsSatisfyExprsPurpose, @@ -35,7 +37,8 @@ use datafusion_common::tree_node::{ Transformed, TransformedResult, TreeNode, TreeNodeRecursion, }; use datafusion_common::{ - Column, DFSchema, DFSchemaRef, JoinType, Result, not_impl_err, plan_err, + Column, DFSchema, DFSchemaRef, JoinType, Result, TableReference, not_impl_err, + plan_err, }; use datafusion_common::{RecursionUnnestOption, UnnestOptions}; use datafusion_expr::ExprSchemable; @@ -94,6 +97,13 @@ fn flatten_expr_groups(expr_groups: Vec>) -> Vec { expr_groups.into_iter().flatten().collect() } +fn merged_key_qualifier(name: &str) -> TableReference { + TableReference::bare(format!( + "{}{name}", + LogicalPlanBuilder::MERGED_KEY_NAME_PREFIX + )) +} + /// Normalize unqualified column references in `expr` against `schemas`, then /// resolve the merged key of any RIGHT / FULL USING / NATURAL join to the side /// that is never NULL-padded (see [`merged_using_key_or_column`]). @@ -1002,9 +1012,17 @@ impl SqlToRel<'_, S> { ) -> Result> { let mut prepared_select_exprs = vec![]; let mut error_builder = DataFusionErrorBuilder::new(); + let internally_qualified_merged_keys = + self.internally_qualified_merged_keys(&projection, plan)?; for expr in projection { - match self.sql_select_to_rex(expr, plan, empty_from, planner_context) { + match self.sql_select_to_rex( + expr, + plan, + empty_from, + planner_context, + &internally_qualified_merged_keys, + ) { Ok(expr) => prepared_select_exprs.push(expr), Err(err) => error_builder.add_error(err), } @@ -1012,6 +1030,69 @@ impl SqlToRel<'_, S> { error_builder.error_or(prepared_select_exprs) } + /// Returns the direct column reference from an unaliased SQL projection item. + /// `SelectItem` is the sqlparser AST node for one `SELECT` item; this helper + /// only extracts simple references such as `k` or `a.k`. + fn direct_select_column(&self, select_item: &SelectItem) -> Result> { + let SelectItem::UnnamedExpr(expr) = select_item else { + return Ok(None); + }; + + match expr { + SQLExpr::Identifier(ident) => Ok(Some(Column::new_unqualified( + self.ident_normalizer.normalize(ident.clone()), + ))), + SQLExpr::CompoundIdentifier(idents) if idents.len() >= 2 => { + let mut qualifier_idents = idents.clone(); + let name = self + .ident_normalizer + .normalize(qualifier_idents.pop().expect("checked len")); + let relation = idents_to_table_reference( + qualifier_idents, + self.options.enable_ident_normalization, + )?; + Ok(Some(Column::new(Some(relation), name))) + } + _ => Ok(None), + } + } + + fn internally_qualified_merged_keys( + &self, + projection: &[SelectItem], + plan: &LogicalPlan, + ) -> Result> { + let using_key_pairs = plan.using_key_pairs()?; + if using_key_pairs.is_empty() { + return Ok(HashSet::new()); + } + + let mut unqualified_select_keys = HashSet::new(); + let mut qualified_select_key_names = HashSet::new(); + for select_item in projection { + if let Some(column) = self.direct_select_column(select_item)? { + if column.relation.is_none() { + unqualified_select_keys.insert(column.name); + } else { + qualified_select_key_names.insert(column.name); + } + } + } + + let mut conflicting_merged_keys = HashSet::new(); + for (left, _right) in using_key_pairs { + // `USING` / `NATURAL` join keys have the same visible merged-key + // name on both sides. Keep `left.name` as the canonical output name. + if unqualified_select_keys.contains(&left.name) + && qualified_select_key_names.contains(&left.name) + { + conflicting_merged_keys.insert(left.name); + } + } + + Ok(conflicting_merged_keys) + } + /// Generate a relational expression from a select SQL expression fn sql_select_to_rex( &self, @@ -1019,16 +1100,33 @@ impl SqlToRel<'_, S> { plan: &LogicalPlan, empty_from: bool, planner_context: &mut PlannerContext, + internally_qualified_merged_keys: &HashSet, ) -> Result { match sql { SelectItem::UnnamedExpr(expr) => { let expr = self.sql_to_expr(expr, plan.schema(), planner_context)?; + let internal_merged_key_name = match &expr { + Expr::Column(column) + if column.relation.is_none() + && internally_qualified_merged_keys + .contains(&column.name) => + { + Some(column.name.clone()) + } + _ => None, + }; let col = normalize_col_resolving_merged_using_key( expr, &[&[plan.schema()]], &plan.using_columns()?, &plan.right_or_full_using_key_pairs()?, )?; + let col = if let Some(name) = internal_merged_key_name { + col.unalias() + .alias_qualified(Some(merged_key_qualifier(&name)), name) + } else { + col + }; Ok(SelectExpr::Expression(col)) } @@ -1140,6 +1238,7 @@ impl SqlToRel<'_, S> { plan, empty_from, planner_context, + &HashSet::new(), ) }) .collect::>>()? diff --git a/datafusion/sql/src/unparser/plan.rs b/datafusion/sql/src/unparser/plan.rs index 37ff8145d3fd2..653d9805f11b4 100644 --- a/datafusion/sql/src/unparser/plan.rs +++ b/datafusion/sql/src/unparser/plan.rs @@ -2151,6 +2151,19 @@ impl Unparser<'_> { fn select_item_to_sql(&self, expr: &Expr) -> Result { match expr { + Expr::Alias(Alias { + relation: Some(relation), + name, + .. + }) if relation + .table() + .strip_prefix(LogicalPlanBuilder::MERGED_KEY_NAME_PREFIX) + == Some(name.as_str()) => + { + Ok(ast::SelectItem::UnnamedExpr(ast::Expr::Identifier( + self.new_ident_quoted_if_needs(name.to_string()), + ))) + } Expr::Alias(Alias { expr, name, .. }) => { let inner = self.expr_to_sql(expr)?; diff --git a/datafusion/sql/tests/cases/plan_to_sql.rs b/datafusion/sql/tests/cases/plan_to_sql.rs index 01cde263adba0..fb5b314f109df 100644 --- a/datafusion/sql/tests/cases/plan_to_sql.rs +++ b/datafusion/sql/tests/cases/plan_to_sql.rs @@ -303,6 +303,43 @@ fn roundtrip_full_join_merged_key_order_by_qualified(#[case] query: &str) -> Res Ok(()) } +#[rstest] +#[case::left_join_using( + "SELECT id, ta.id FROM (SELECT j1_id AS id FROM j1) ta \ + LEFT JOIN (SELECT j2_id AS id FROM j2) tb USING (id)" +)] +#[case::right_join_using( + "SELECT id, tb.id FROM (SELECT j1_id AS id FROM j1) ta \ + RIGHT JOIN (SELECT j2_id AS id FROM j2) tb USING (id)" +)] +#[case::full_join_using( + "SELECT id, ta.id, tb.id FROM (SELECT j1_id AS id FROM j1) ta \ + FULL JOIN (SELECT j2_id AS id FROM j2) tb USING (id)" +)] +fn roundtrip_join_using_merged_key_with_qualified_side_keys( + #[case] query: &str, +) -> Result<()> { + let dialect = GenericDialect {}; + let statement = Parser::new(&dialect) + .try_with_sql(query)? + .parse_statement()?; + + let context = MockContextProvider { + state: MockSessionState::default(), + }; + let sql_to_rel = SqlToRel::new(&context); + let plan = sql_to_rel.sql_statement_to_plan(statement).unwrap(); + + let roundtrip_statement = plan_to_sql(&plan)?; + let plan_roundtrip = sql_to_rel + .sql_statement_to_plan(roundtrip_statement) + .unwrap(); + + assert_eq!(plan, plan_roundtrip); + + Ok(()) +} + #[test] fn roundtrip_crossjoin() -> Result<()> { let query = "select j1.j1_id, j2.j2_string from j1, j2"; diff --git a/datafusion/sqllogictest/test_files/join_using_merged_key.slt b/datafusion/sqllogictest/test_files/join_using_merged_key.slt index d7c951438b246..98df116cf26a3 100644 --- a/datafusion/sqllogictest/test_files/join_using_merged_key.slt +++ b/datafusion/sqllogictest/test_files/join_using_merged_key.slt @@ -110,6 +110,29 @@ SELECT a.k, b.k FROM a FULL JOIN b USING (k) ORDER BY coalesce(a.k, b.k) 3 3 NULL 4 +# the merged key is distinct from an explicitly qualified side key +query II rowsort +SELECT k, a.k FROM a LEFT JOIN b USING (k) +---- +1 1 +2 2 +3 3 + +query II rowsort +SELECT k, b.k FROM a RIGHT JOIN b USING (k) +---- +2 2 +3 3 +4 4 + +query III rowsort +SELECT k, a.k, b.k FROM a FULL JOIN b USING (k) +---- +1 1 NULL +2 2 2 +3 3 3 +4 NULL 4 + # the explicit form is the reference for what the merged key should equal query I nosort SELECT coalesce(a.k, b.k) AS k FROM a FULL JOIN b ON a.k = b.k ORDER BY k From e5db60de79b7c0e70249a68cede6411e46f2e94a Mon Sep 17 00:00:00 2001 From: Jiawei Zhao Date: Mon, 29 Jun 2026 13:16:12 +0800 Subject: [PATCH 08/10] refactor: give USING merged keys an internal identity Resolve unqualified USING / NATURAL merged-key references to an internally qualified merged-key expression. The expression uses the left key for left-preserving joins, the right key for right-preserving joins, and CASE / COALESCE semantics for FULL joins, while preserving the user-visible key name. This removes late merged-key rewrite handling from SELECT and sort push-down paths because the merged key now carries a distinct internal identity during normalization. Refs #22881 Signed-off-by: Jiawei Zhao --- datafusion/expr/src/expr_rewriter/mod.rs | 59 ++++---- datafusion/expr/src/logical_plan/builder.rs | 146 +------------------- datafusion/expr/src/logical_plan/plan.rs | 40 +----- datafusion/expr/src/utils.rs | 8 +- datafusion/sql/src/select.rs | 119 ++-------------- datafusion/sql/src/unparser/rewrite.rs | 3 +- datafusion/sql/tests/cases/plan_to_sql.rs | 45 ++---- 7 files changed, 61 insertions(+), 359 deletions(-) diff --git a/datafusion/expr/src/expr_rewriter/mod.rs b/datafusion/expr/src/expr_rewriter/mod.rs index d05d280ba3155..f406ae8568dbf 100644 --- a/datafusion/expr/src/expr_rewriter/mod.rs +++ b/datafusion/expr/src/expr_rewriter/mod.rs @@ -68,12 +68,11 @@ pub trait FunctionRewrite: Debug { /// Recursively call `LogicalPlanBuilder::normalize` on all [`Column`] expressions /// in the `expr` expression tree. /// -/// An unqualified reference to the merged key of a `RIGHT` / `FULL` -/// `USING` / `NATURAL` join is resolved to the never-NULL-padded side -- the -/// right key for `RIGHT`, `COALESCE(left, right)` for `FULL` -- via +/// An unqualified reference to the merged key of a `USING` / `NATURAL` join is +/// resolved to a reserved internal merged-key expression via /// [`merged_using_key_or_column`]. pub fn normalize_col(expr: Expr, plan: &LogicalPlan) -> Result { - let merged_keys = plan.right_or_full_using_key_pairs()?; + let merged_keys = plan.using_key_pairs()?; expr.transform(|expr| { Ok({ if let Expr::Column(c) = expr { @@ -92,18 +91,23 @@ pub fn normalize_col(expr: Expr, plan: &LogicalPlan) -> Result { .data() } +fn merged_key_qualifier(name: &str) -> TableReference { + TableReference::bare(format!( + "{}{name}", + LogicalPlanBuilder::MERGED_KEY_NAME_PREFIX + )) +} + /// Resolve a normalized column to itself, or -- when it is the merged key of a -/// `RIGHT` / `FULL` `USING` / `NATURAL` join referenced *unqualified* -- to the -/// value of the side that is never NULL-padded. +/// `USING` / `NATURAL` join referenced *unqualified* -- to the merged key +/// expression with a reserved internal qualifier. /// /// `merged_keys` carries the `(left, right, join_type)` triples of the plan's -/// `RIGHT` / `FULL` USING / NATURAL joins. The merged key resolves to the right -/// key for `RIGHT`, and to `COALESCE(left, right)` for `FULL`. The COALESCE is -/// built as `CASE WHEN left IS NOT NULL THEN left ELSE right END` (the form -/// `coalesce` is simplified to, buildable here without depending on the -/// functions crate), aliased to the key name so the output column keeps its -/// name. `LEFT` / `INNER` never reach here -- their merged key is the left -/// column the normalization already produced. +/// USING / NATURAL joins. The merged key resolves to the left key for +/// left-preserving joins, the right key for right-preserving joins, and +/// `COALESCE(left, right)` for `FULL`. The COALESCE is built as +/// `CASE WHEN left IS NOT NULL THEN left ELSE right END` (the form `coalesce` +/// is simplified to, buildable here without depending on the functions crate). pub fn merged_using_key_or_column( col: Column, was_unqualified: bool, @@ -113,23 +117,24 @@ pub fn merged_using_key_or_column( && let Some((l, r, join_type)) = merged_keys.iter().find(|(l, r, _)| l == &col || r == &col) { - match join_type { + let name = col.name.clone(); + let expr = match join_type { // The left key is NULL-padded on right-only rows; the right key is // always present, so the merged key is just the right column. - JoinType::Right => return Ok(Expr::Column(r.clone())), - // Either side may be NULL-padded; coalesce to the present one. - JoinType::Full => { - let case = CaseBuilder::new( - None, - vec![Expr::Column(l.clone()).is_not_null()], - vec![Expr::Column(l.clone())], - Some(Box::new(Expr::Column(r.clone()))), - ) - .end()?; - return Ok(case.alias(col.name.clone())); + JoinType::Right | JoinType::RightSemi | JoinType::RightAnti => { + Expr::Column(r.clone()) } - _ => {} - } + // Either side may be NULL-padded; coalesce to the present one. + JoinType::Full => CaseBuilder::new( + None, + vec![Expr::Column(l.clone()).is_not_null()], + vec![Expr::Column(l.clone())], + Some(Box::new(Expr::Column(r.clone()))), + ) + .end()?, + _ => Expr::Column(l.clone()), + }; + return Ok(expr.alias_qualified(Some(merged_key_qualifier(&name)), name)); } Ok(Expr::Column(col)) } diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 6e4a0062bb8fb..610325d9bd362 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -53,7 +53,6 @@ use arrow::datatypes::{DataType, Field, FieldRef, Fields, Schema, SchemaRef}; use datafusion_common::display::ToStringifiedPlan; use datafusion_common::file_options::file_type::FileType; use datafusion_common::metadata::FieldMetadata; -use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::{ Column, Constraints, DFSchema, DFSchemaRef, NullEquality, Result, ScalarValue, TableReference, ToDFSchema, UnnestOptions, exec_err, @@ -824,97 +823,6 @@ impl LogicalPlanBuilder { self.sort_with_limit(sorts, None) } - /// Internal name for a FULL USING / NATURAL join merged key that has been - /// renamed to dodge an unqualified/qualified schema collision during sort - /// push-down. See [`Self::sort_with_limit`]. - fn synthetic_merged_name(name: &str) -> String { - format!("{}{name}", Self::MERGED_KEY_NAME_PREFIX) - } - - /// Unqualified projection output fields that collide with a *qualified* - /// missing sort column of the same name. In practice these are the - /// `COALESCE` merged keys of FULL USING / NATURAL joins, exposed - /// unqualified, which `ORDER BY ` would otherwise drag into - /// an illegal `{ k, a.k }` projection schema. They are renamed to a - /// synthetic name before folding; any unqualified reference to them in the - /// sort itself is rewritten to match (see [`Self::rewrite_sort_merged_keys`]). - fn renamable_merged_keys( - schema: &DFSchemaRef, - missing_cols: &IndexSet, - ) -> HashSet { - let qualified_missing: HashSet<&str> = missing_cols - .iter() - .filter(|c| c.relation.is_some()) - .map(|c| c.name.as_str()) - .collect(); - if qualified_missing.is_empty() { - return HashSet::new(); - } - - schema - .columns() - .into_iter() - .filter(|c| { - c.relation.is_none() && qualified_missing.contains(c.name.as_str()) - }) - .map(|c| c.name) - .collect() - } - - /// Re-alias a renamable merged key output field to its - /// [`Self::synthetic_merged_name`] so it no longer collides with the - /// qualified sort column folded alongside it. - fn rename_merged_key(expr: Expr, renamable: &HashSet) -> Expr { - match expr { - Expr::Alias(mut alias) - if alias.relation.is_none() && renamable.contains(&alias.name) => - { - alias.name = Self::synthetic_merged_name(&alias.name); - Expr::Alias(alias) - } - Expr::Column(column) - if column.relation.is_none() && renamable.contains(&column.name) => - { - let name = Self::synthetic_merged_name(&column.name); - Expr::Column(column).alias(name) - } - other => other, - } - } - - /// Rewrite unqualified references to a renamed merged key inside the sort - /// expressions to its [`Self::synthetic_merged_name`], so a sort that also - /// orders by the merged key (e.g. `ORDER BY a.k, k`) keeps resolving after - /// the projection field has been renamed. - fn rewrite_sort_merged_keys( - sorts: Vec, - renamable: &HashSet, - ) -> Result> { - sorts - .into_iter() - .map(|sort| { - let expr = sort - .expr - .clone() - .transform(|e| { - Ok(match e { - Expr::Column(c) - if c.relation.is_none() - && renamable.contains(&c.name) => - { - Transformed::yes(Expr::Column(Column::new_unqualified( - Self::synthetic_merged_name(&c.name), - ))) - } - other => Transformed::no(other), - }) - }) - .data()?; - Ok(sort.with_expr(expr)) - }) - .collect() - } - /// Apply a sort pub fn sort_with_limit( self, @@ -948,60 +856,10 @@ impl LogicalPlanBuilder { }))); } - // A qualified sort column folded into the projection can collide with an - // unqualified output field of the same name: the COALESCE merged key of a - // FULL USING / NATURAL join is exposed unqualified, so `ORDER BY a.k` - // dragging the qualified `a.k` into the `SELECT k` projection would build - // an illegal `{ k, a.k }` schema. Rename such merged keys to a unique - // internal name before folding, and restore the original name below in - // the wrapper projection. - let renamable = match self.plan.as_ref() { - LogicalPlan::Projection(_) => { - Self::renamable_merged_keys(schema, &missing_cols) - } - _ => HashSet::new(), - }; - - // Keep the sort's own references to a renamed merged key resolving by - // rewriting them to the same synthetic name (e.g. `ORDER BY a.k, k`, - // where `k` is the merged key being renamed to dodge the `a.k` clash). - let sorts = if renamable.is_empty() { - sorts - } else { - Self::rewrite_sort_merged_keys(sorts, &renamable)? - }; - // remove pushed down sort columns - let new_expr = schema - .columns() - .into_iter() - .map(|c| { - if c.relation.is_none() && renamable.contains(&c.name) { - Expr::Column(Column::new_unqualified(Self::synthetic_merged_name( - &c.name, - ))) - .alias(c.name) - } else { - Expr::Column(c) - } - }) - .collect(); + let new_expr = schema.columns().into_iter().map(Expr::Column).collect(); - let input = if renamable.is_empty() { - Arc::unwrap_or_clone(self.plan) - } else { - match Arc::unwrap_or_clone(self.plan) { - LogicalPlan::Projection(Projection { expr, input, .. }) => { - let expr = expr - .into_iter() - .map(|e| Self::rename_merged_key(e, &renamable)) - .collect::>(); - LogicalPlan::Projection(Projection::try_new(expr, input)?) - } - // `renamable` is only non-empty for a Projection (checked above). - other => other, - } - }; + let input = Arc::unwrap_or_clone(self.plan); let is_distinct = false; let plan = Self::add_missing_columns(input, &missing_cols, is_distinct)?; diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index c0520fa5f3e10..605d85000cbd2 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -525,43 +525,9 @@ impl LogicalPlan { Ok(using_columns) } - /// Returns the `(left, right)` join-key column pairs of every `USING` / - /// `NATURAL` join in this plan. - pub fn using_key_pairs(&self) -> Result, DataFusionError> { - let mut pairs = vec![]; - - self.apply_with_subqueries(|plan| { - if let LogicalPlan::Join(Join { - join_constraint: JoinConstraint::Using, - on, - .. - }) = plan - { - for (l, r) in on { - if let (Some(l), Some(r)) = - (l.get_as_join_column(), r.get_as_join_column()) - { - pairs.push((l.to_owned(), r.to_owned())); - } - } - } - Ok(TreeNodeRecursion::Continue) - })?; - - Ok(pairs) - } - /// Returns the `(left, right, join_type)` join-key column triples of every - /// `RIGHT` / `FULL` `USING` / `NATURAL` join in this plan. - /// - /// For these join types the merged key is not simply the left column: the - /// left key is NULL-padded on right-only rows. An unqualified reference to - /// the merged key must therefore resolve to the side that is never - /// NULL-padded -- the right key for `RIGHT`, or `COALESCE(left, right)` for - /// `FULL` (where either side may be padded). `LEFT` / `INNER` are omitted: - /// their merged key is the left column, which the normal resolution already - /// picks. - pub fn right_or_full_using_key_pairs( + /// `USING` / `NATURAL` join in this plan. + pub fn using_key_pairs( &self, ) -> Result, DataFusionError> { let mut triples = vec![]; @@ -569,7 +535,7 @@ impl LogicalPlan { self.apply_with_subqueries(|plan| { if let LogicalPlan::Join(Join { join_constraint: JoinConstraint::Using, - join_type: join_type @ (JoinType::Right | JoinType::Full), + join_type, on, .. }) = plan diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index da1428de1567c..fd67f2dbb8968 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -463,11 +463,9 @@ pub fn expand_wildcard( columns_to_skip.extend(excluded_columns); let exprs = get_exprs_except_skipped(schema, &columns_to_skip); - // For RIGHT / FULL USING / NATURAL joins the surviving key column is the - // merged key; resolve it to the never-NULL-padded side (the right key for - // RIGHT, COALESCE(left, right) for FULL) so a wildcard shows the value from - // whichever side is present rather than the NULL-padded one. - let merged_keys = plan.right_or_full_using_key_pairs()?; + // Resolve the surviving USING / NATURAL key column to the internally named + // merged key expression. + let merged_keys = plan.using_key_pairs()?; if merged_keys.is_empty() { return Ok(exprs); } diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index d2dfc3f819010..962be6af83131 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -19,9 +19,7 @@ use std::collections::HashSet; use std::ops::ControlFlow; use std::sync::Arc; -use crate::planner::{ - ContextProvider, PlannerContext, SqlToRel, idents_to_table_reference, -}; +use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; use crate::query::to_order_by_exprs_with_select; use crate::utils::{ CheckColumnsMustReferenceAggregatePurpose, CheckColumnsSatisfyExprsPurpose, @@ -37,8 +35,7 @@ use datafusion_common::tree_node::{ Transformed, TransformedResult, TreeNode, TreeNodeRecursion, }; use datafusion_common::{ - Column, DFSchema, DFSchemaRef, JoinType, Result, TableReference, not_impl_err, - plan_err, + Column, DFSchema, DFSchemaRef, JoinType, Result, not_impl_err, plan_err, }; use datafusion_common::{RecursionUnnestOption, UnnestOptions}; use datafusion_expr::ExprSchemable; @@ -97,16 +94,9 @@ fn flatten_expr_groups(expr_groups: Vec>) -> Vec { expr_groups.into_iter().flatten().collect() } -fn merged_key_qualifier(name: &str) -> TableReference { - TableReference::bare(format!( - "{}{name}", - LogicalPlanBuilder::MERGED_KEY_NAME_PREFIX - )) -} - /// Normalize unqualified column references in `expr` against `schemas`, then -/// resolve the merged key of any RIGHT / FULL USING / NATURAL join to the side -/// that is never NULL-padded (see [`merged_using_key_or_column`]). +/// resolve any USING / NATURAL merged key to its internally qualified merged-key +/// expression (see [`merged_using_key_or_column`]). /// /// This is a single pass on purpose: only references written *unqualified* are /// merged keys, and that distinction is lost once normalization has qualified @@ -935,11 +925,10 @@ impl SqlToRel<'_, S> { } // Resolve unqualified references against the real USING / NATURAL - // join columns so the merged key is recognized (and, for - // RIGHT / FULL, resolved to the preserved side) rather than + // join columns so the merged key is recognized rather than // reported as ambiguous. let using_columns = plan.using_columns()?; - let merged_keys = plan.right_or_full_using_key_pairs()?; + let merged_keys = plan.using_key_pairs()?; let mut schema_stack: Vec> = vec![vec![plan.schema()], fallback_schemas]; for sc in planner_context.outer_schemas_iter() { @@ -1012,17 +1001,8 @@ impl SqlToRel<'_, S> { ) -> Result> { let mut prepared_select_exprs = vec![]; let mut error_builder = DataFusionErrorBuilder::new(); - let internally_qualified_merged_keys = - self.internally_qualified_merged_keys(&projection, plan)?; - for expr in projection { - match self.sql_select_to_rex( - expr, - plan, - empty_from, - planner_context, - &internally_qualified_merged_keys, - ) { + match self.sql_select_to_rex(expr, plan, empty_from, planner_context) { Ok(expr) => prepared_select_exprs.push(expr), Err(err) => error_builder.add_error(err), } @@ -1030,69 +1010,6 @@ impl SqlToRel<'_, S> { error_builder.error_or(prepared_select_exprs) } - /// Returns the direct column reference from an unaliased SQL projection item. - /// `SelectItem` is the sqlparser AST node for one `SELECT` item; this helper - /// only extracts simple references such as `k` or `a.k`. - fn direct_select_column(&self, select_item: &SelectItem) -> Result> { - let SelectItem::UnnamedExpr(expr) = select_item else { - return Ok(None); - }; - - match expr { - SQLExpr::Identifier(ident) => Ok(Some(Column::new_unqualified( - self.ident_normalizer.normalize(ident.clone()), - ))), - SQLExpr::CompoundIdentifier(idents) if idents.len() >= 2 => { - let mut qualifier_idents = idents.clone(); - let name = self - .ident_normalizer - .normalize(qualifier_idents.pop().expect("checked len")); - let relation = idents_to_table_reference( - qualifier_idents, - self.options.enable_ident_normalization, - )?; - Ok(Some(Column::new(Some(relation), name))) - } - _ => Ok(None), - } - } - - fn internally_qualified_merged_keys( - &self, - projection: &[SelectItem], - plan: &LogicalPlan, - ) -> Result> { - let using_key_pairs = plan.using_key_pairs()?; - if using_key_pairs.is_empty() { - return Ok(HashSet::new()); - } - - let mut unqualified_select_keys = HashSet::new(); - let mut qualified_select_key_names = HashSet::new(); - for select_item in projection { - if let Some(column) = self.direct_select_column(select_item)? { - if column.relation.is_none() { - unqualified_select_keys.insert(column.name); - } else { - qualified_select_key_names.insert(column.name); - } - } - } - - let mut conflicting_merged_keys = HashSet::new(); - for (left, _right) in using_key_pairs { - // `USING` / `NATURAL` join keys have the same visible merged-key - // name on both sides. Keep `left.name` as the canonical output name. - if unqualified_select_keys.contains(&left.name) - && qualified_select_key_names.contains(&left.name) - { - conflicting_merged_keys.insert(left.name); - } - } - - Ok(conflicting_merged_keys) - } - /// Generate a relational expression from a select SQL expression fn sql_select_to_rex( &self, @@ -1100,33 +1017,16 @@ impl SqlToRel<'_, S> { plan: &LogicalPlan, empty_from: bool, planner_context: &mut PlannerContext, - internally_qualified_merged_keys: &HashSet, ) -> Result { match sql { SelectItem::UnnamedExpr(expr) => { let expr = self.sql_to_expr(expr, plan.schema(), planner_context)?; - let internal_merged_key_name = match &expr { - Expr::Column(column) - if column.relation.is_none() - && internally_qualified_merged_keys - .contains(&column.name) => - { - Some(column.name.clone()) - } - _ => None, - }; let col = normalize_col_resolving_merged_using_key( expr, &[&[plan.schema()]], &plan.using_columns()?, - &plan.right_or_full_using_key_pairs()?, + &plan.using_key_pairs()?, )?; - let col = if let Some(name) = internal_merged_key_name { - col.unalias() - .alias_qualified(Some(merged_key_qualifier(&name)), name) - } else { - col - }; Ok(SelectExpr::Expression(col)) } @@ -1137,7 +1037,7 @@ impl SqlToRel<'_, S> { select_expr, &[&[plan.schema()]], &plan.using_columns()?, - &plan.right_or_full_using_key_pairs()?, + &plan.using_key_pairs()?, )?; let name = self.ident_normalizer.normalize(alias); // avoiding adding an alias if the column name is the same. @@ -1238,7 +1138,6 @@ impl SqlToRel<'_, S> { plan, empty_from, planner_context, - &HashSet::new(), ) }) .collect::>>()? diff --git a/datafusion/sql/src/unparser/rewrite.rs b/datafusion/sql/src/unparser/rewrite.rs index c4a23618a896d..5c23828143b80 100644 --- a/datafusion/sql/src/unparser/rewrite.rs +++ b/datafusion/sql/src/unparser/rewrite.rs @@ -201,7 +201,8 @@ pub(super) fn rewrite_plan_for_sort_on_non_projected_fields( .enumerate() .map(|(i, f)| match f { Expr::Alias(alias) => { - let a = Expr::Column(alias.name.clone().into()); + let a = + Expr::Column(Column::new(alias.relation.clone(), alias.name.clone())); map.insert(a.clone(), f.clone()); a } diff --git a/datafusion/sql/tests/cases/plan_to_sql.rs b/datafusion/sql/tests/cases/plan_to_sql.rs index fb5b314f109df..0ff002e2f4439 100644 --- a/datafusion/sql/tests/cases/plan_to_sql.rs +++ b/datafusion/sql/tests/cases/plan_to_sql.rs @@ -263,56 +263,31 @@ fn roundtrip_statement() -> Result<()> { /// Regression for https://github.com/apache/datafusion/issues/22881. /// -/// A FULL JOIN USING / NATURAL merged key is `COALESCE(left, right)`, exposed -/// as an *unqualified* column. Selecting that merged key while ordering by a -/// *qualified* key of the same join (`ORDER BY ta.id`) must both plan and -/// survive an unparser round-trip (`plan -> SQL -> plan`). Earlier fixes either -/// errored at plan time or produced SQL that could not be re-planned. `USING` -/// and `NATURAL` are covered separately since the unparser renders them -/// differently. +/// A `USING` / `NATURAL` merged key is exposed as an unqualified column, while +/// the original side keys remain addressable through qualified references. The +/// merged key and qualified side keys may appear together because the user +/// selected them, or because `ORDER BY` folds a non-projected side key into the +/// plan. Both shapes must survive an unparser round-trip (`plan -> SQL -> plan`). #[rstest] -#[case::full_join_using( +#[case::order_by_qualified_side_key_using( "SELECT id FROM (SELECT j1_id AS id FROM j1) ta \ FULL JOIN (SELECT j2_id AS id FROM j2) tb USING (id) \ ORDER BY ta.id" )] -#[case::natural_full_join( +#[case::order_by_qualified_side_key_natural( "SELECT id FROM (SELECT j1_id AS id FROM j1) ta \ NATURAL FULL JOIN (SELECT j2_id AS id FROM j2) tb \ ORDER BY ta.id" )] -fn roundtrip_full_join_merged_key_order_by_qualified(#[case] query: &str) -> Result<()> { - let dialect = GenericDialect {}; - let statement = Parser::new(&dialect) - .try_with_sql(query)? - .parse_statement()?; - - let context = MockContextProvider { - state: MockSessionState::default(), - }; - let sql_to_rel = SqlToRel::new(&context); - let plan = sql_to_rel.sql_statement_to_plan(statement).unwrap(); - - let roundtrip_statement = plan_to_sql(&plan)?; - let plan_roundtrip = sql_to_rel - .sql_statement_to_plan(roundtrip_statement) - .unwrap(); - - assert_eq!(plan, plan_roundtrip); - - Ok(()) -} - -#[rstest] -#[case::left_join_using( +#[case::select_merged_and_left_side_key( "SELECT id, ta.id FROM (SELECT j1_id AS id FROM j1) ta \ LEFT JOIN (SELECT j2_id AS id FROM j2) tb USING (id)" )] -#[case::right_join_using( +#[case::select_merged_and_right_side_key( "SELECT id, tb.id FROM (SELECT j1_id AS id FROM j1) ta \ RIGHT JOIN (SELECT j2_id AS id FROM j2) tb USING (id)" )] -#[case::full_join_using( +#[case::select_merged_and_both_side_keys( "SELECT id, ta.id, tb.id FROM (SELECT j1_id AS id FROM j1) ta \ FULL JOIN (SELECT j2_id AS id FROM j2) tb USING (id)" )] From 17093134698f2b1efb580cf076368db3dd66fbf2 Mon Sep 17 00:00:00 2001 From: Jiawei Zhao Date: Mon, 29 Jun 2026 16:02:12 +0800 Subject: [PATCH 09/10] refactor: use a single USING merged-key qualifier Store USING and NATURAL merged keys under one reserved internal qualifier instead of embedding the column name in the qualifier. This keeps merged keys distinct from side columns while simplifying unparser recognition. Signed-off-by: Jiawei Zhao --- datafusion/expr/src/expr_rewriter/mod.rs | 14 ++++++++------ datafusion/expr/src/logical_plan/builder.rs | 6 +++--- .../optimizer/src/optimize_projections/mod.rs | 2 +- datafusion/sql/src/unparser/plan.rs | 6 +----- datafusion/sqllogictest/test_files/join.slt.part | 2 +- datafusion/sqllogictest/test_files/joins.slt | 8 ++++---- 6 files changed, 18 insertions(+), 20 deletions(-) diff --git a/datafusion/expr/src/expr_rewriter/mod.rs b/datafusion/expr/src/expr_rewriter/mod.rs index f406ae8568dbf..d1d47078ae5e8 100644 --- a/datafusion/expr/src/expr_rewriter/mod.rs +++ b/datafusion/expr/src/expr_rewriter/mod.rs @@ -91,11 +91,13 @@ pub fn normalize_col(expr: Expr, plan: &LogicalPlan) -> Result { .data() } -fn merged_key_qualifier(name: &str) -> TableReference { - TableReference::bare(format!( - "{}{name}", - LogicalPlanBuilder::MERGED_KEY_NAME_PREFIX - )) +fn alias_merged_key(expr: Expr, name: &str) -> Expr { + expr.alias_qualified( + Some(TableReference::bare( + LogicalPlanBuilder::MERGED_KEY_QUALIFIER, + )), + name, + ) } /// Resolve a normalized column to itself, or -- when it is the merged key of a @@ -134,7 +136,7 @@ pub fn merged_using_key_or_column( .end()?, _ => Expr::Column(l.clone()), }; - return Ok(expr.alias_qualified(Some(merged_key_qualifier(&name)), name)); + return Ok(alias_merged_key(expr, &name)); } Ok(Expr::Column(col)) } diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 610325d9bd362..25e84fcde59d7 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -130,9 +130,9 @@ pub struct LogicalPlanBuilder { } impl LogicalPlanBuilder { - /// Reserved prefix for an internal FULL USING / NATURAL join merged key name - /// or qualifier. A real column or relation carrying this prefix would clash. - pub const MERGED_KEY_NAME_PREFIX: &str = "__datafusion_merged_key_"; + /// Reserved internal qualifier for USING / NATURAL join merged keys. + /// A real relation carrying this qualifier would clash. + pub const MERGED_KEY_QUALIFIER: &str = "__datafusion_merged_key_qualifier__"; /// Create a builder from an existing plan pub fn new(plan: LogicalPlan) -> Self { diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index acdbf71d05d5c..d900037eebf15 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -1952,7 +1952,7 @@ mod tests { assert_snapshot!( optimized_plan.clone(), @r" - Projection: test.a, test.b + Projection: test.a AS a, test.b Left Join: Using test.a = test2.a TableScan: test projection=[a, b] TableScan: test2 projection=[a] diff --git a/datafusion/sql/src/unparser/plan.rs b/datafusion/sql/src/unparser/plan.rs index 653d9805f11b4..822619acd8774 100644 --- a/datafusion/sql/src/unparser/plan.rs +++ b/datafusion/sql/src/unparser/plan.rs @@ -2155,11 +2155,7 @@ impl Unparser<'_> { relation: Some(relation), name, .. - }) if relation - .table() - .strip_prefix(LogicalPlanBuilder::MERGED_KEY_NAME_PREFIX) - == Some(name.as_str()) => - { + }) if relation.table() == LogicalPlanBuilder::MERGED_KEY_QUALIFIER => { Ok(ast::SelectItem::UnnamedExpr(ast::Expr::Identifier( self.new_ident_quoted_if_needs(name.to_string()), ))) diff --git a/datafusion/sqllogictest/test_files/join.slt.part b/datafusion/sqllogictest/test_files/join.slt.part index 00bea008fc2fc..242e6c1fd930b 100644 --- a/datafusion/sqllogictest/test_files/join.slt.part +++ b/datafusion/sqllogictest/test_files/join.slt.part @@ -1167,7 +1167,7 @@ INNER JOIN t0 ON (t0.v1 + t5.v0) > 0 WHERE t0.v1 = t1.v0; ---- logical_plan -01)Projection: t1.v0, t1.v1, t5.v2, t5.v3, t5.v4, t0.v0, t0.v1 +01)Projection: t1.v0 AS v0, t1.v1 AS v1, t5.v2, t5.v3, t5.v4, t0.v0, t0.v1 02)--Inner Join: CAST(t1.v0 AS Float64) = t0.v1 Filter: t0.v1 + CAST(t5.v0 AS Float64) > Float64(0) 03)----Projection: t1.v0, t1.v1, t5.v0, t5.v2, t5.v3, t5.v4 04)------Inner Join: t1.v0 = t5.v0, t1.v1 = t5.v1 diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index 1101ad6d2b14d..2203239a9e9d7 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -4542,7 +4542,7 @@ query TT explain SELECT * FROM person a join person b using (id, age); ---- logical_plan -01)Projection: a.id, a.age, a.state, b.state +01)Projection: a.id AS id, a.age AS age, a.state, b.state 02)--Inner Join: a.id = b.id, a.age = b.age 03)----SubqueryAlias: a 04)------TableScan: person projection=[id, age, state] @@ -4557,7 +4557,7 @@ query TT explain SELECT age FROM (SELECT * FROM person a join person b using (id, age, state)); ---- logical_plan -01)Projection: a.age +01)Projection: a.age AS age 02)--Inner Join: a.id = b.id, a.age = b.age, a.state = b.state 03)----SubqueryAlias: a 04)------TableScan: person projection=[id, age, state] @@ -4601,7 +4601,7 @@ query TT explain SELECT * FROM person a join person b using (id, age, state) join person c using (id, age, state); ---- logical_plan -01)Projection: a.id, a.age, a.state +01)Projection: a.id AS id, a.age AS age, a.state AS state 02)--Inner Join: a.id = c.id, a.age = c.age, a.state = c.state 03)----Projection: a.id, a.age, a.state 04)------Inner Join: a.id = b.id, a.age = b.age, a.state = b.state @@ -4636,7 +4636,7 @@ query TT explain SELECT * FROM lineitem JOIN lineitem as lineitem2 USING (c1) ---- logical_plan -01)Projection: lineitem.c1 +01)Projection: lineitem.c1 AS c1 02)--Inner Join: lineitem.c1 = lineitem2.c1 03)----TableScan: lineitem projection=[c1] 04)----SubqueryAlias: lineitem2 From f4c5047fbbb99622dcf2405d044ee1b463ae2bb9 Mon Sep 17 00:00:00 2001 From: Jiawei Zhao Date: Mon, 29 Jun 2026 21:47:40 +0800 Subject: [PATCH 10/10] fix: hide internal USING merged-key qualifier Keep wildcard output schemas on their visible input columns while still evaluating RIGHT and FULL USING keys with merged-key semantics. Also unwrap the reserved merged-key alias for GROUP BY planning so invalid ORDER BY keys report the aggregate validation error instead of a schema lookup failure. Signed-off-by: Jiawei Zhao --- datafusion/expr/src/utils.rs | 42 +++++++++++++++++-- datafusion/sql/src/select.rs | 38 ++++++++++++++++- datafusion/sql/tests/sql_integration.rs | 19 +++++++-- .../sqllogictest/test_files/join.slt.part | 2 +- datafusion/sqllogictest/test_files/joins.slt | 8 ++-- 5 files changed, 95 insertions(+), 14 deletions(-) diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index fd67f2dbb8968..f22b53ea0adb8 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -21,8 +21,10 @@ use std::cmp::Ordering; use std::collections::{BTreeSet, HashSet}; use std::sync::Arc; +use crate::conditional_expressions::CaseBuilder; use crate::expr::{Alias, Sort, WildcardOptions, WindowFunctionParams}; -use crate::expr_rewriter::{merged_using_key_or_column, strip_outer_reference}; +use crate::expr_rewriter::strip_outer_reference; +use crate::logical_plan::JoinType; use crate::{ BinaryExpr, Expr, ExprSchemable, Filter, GroupingSet, LogicalPlan, Operator, and, }; @@ -442,6 +444,38 @@ fn exclude_using_columns(plan: &LogicalPlan) -> Result> { Ok(excluded) } +fn merged_using_key_for_wildcard( + col: Column, + merged_keys: &[(Column, Column, JoinType)], +) -> Result { + let Some((l, r, join_type)) = + merged_keys.iter().find(|(l, r, _)| l == &col || r == &col) + else { + return Ok(Expr::Column(col)); + }; + + let (expr, visible_col) = match join_type { + JoinType::Right | JoinType::RightSemi | JoinType::RightAnti => { + (Expr::Column(r.clone()), r.clone()) + } + JoinType::Full => CaseBuilder::new( + None, + vec![Expr::Column(l.clone()).is_not_null()], + vec![Expr::Column(l.clone())], + Some(Box::new(Expr::Column(r.clone()))), + ) + .end() + .map(|expr| (expr, col.clone()))?, + _ => (Expr::Column(l.clone()), l.clone()), + }; + + if expr == Expr::Column(visible_col.clone()) { + Ok(expr) + } else { + Ok(expr.alias_qualified(visible_col.relation, visible_col.name)) + } +} + /// Resolves an `Expr::Wildcard` to a collection of `Expr::Column`'s. pub fn expand_wildcard( schema: &DFSchema, @@ -463,8 +497,8 @@ pub fn expand_wildcard( columns_to_skip.extend(excluded_columns); let exprs = get_exprs_except_skipped(schema, &columns_to_skip); - // Resolve the surviving USING / NATURAL key column to the internally named - // merged key expression. + // Resolve the surviving USING / NATURAL key column to the merged key value, + // while preserving the wildcard-visible field qualifier. let merged_keys = plan.using_key_pairs()?; if merged_keys.is_empty() { return Ok(exprs); @@ -472,7 +506,7 @@ pub fn expand_wildcard( exprs .into_iter() .map(|expr| match expr { - Expr::Column(col) => merged_using_key_or_column(col, true, &merged_keys), + Expr::Column(col) => merged_using_key_for_wildcard(col, &merged_keys), other => Ok(other), }) .collect() diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index 962be6af83131..a76efde9b0112 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -40,7 +40,7 @@ use datafusion_common::{ use datafusion_common::{RecursionUnnestOption, UnnestOptions}; use datafusion_expr::ExprSchemable; use datafusion_expr::builder::get_struct_unnested_columns; -use datafusion_expr::expr::{PlannedReplaceSelectItem, Unnest, WildcardOptions}; +use datafusion_expr::expr::{Alias, PlannedReplaceSelectItem, Unnest, WildcardOptions}; use datafusion_expr::expr_rewriter::{ merged_using_key_or_column, normalize_col, normalize_sorts, }; @@ -135,6 +135,40 @@ fn normalize_col_resolving_merged_using_key( .data() } +fn unalias_internal_merged_key(expr: Expr, select_exprs: &[Expr]) -> Expr { + match expr { + Expr::Alias(Alias { + expr, + relation: Some(relation), + .. + }) if relation.table() == LogicalPlanBuilder::MERGED_KEY_QUALIFIER => *expr, + Expr::Column(column) + if column.relation.as_ref().is_some_and(|relation| { + relation.table() == LogicalPlanBuilder::MERGED_KEY_QUALIFIER + }) => + { + select_exprs + .iter() + .find_map(|select_expr| match select_expr { + Expr::Alias(Alias { + expr, + relation: Some(relation), + name, + .. + }) if relation.table() + == LogicalPlanBuilder::MERGED_KEY_QUALIFIER + && name == &column.name => + { + Some(*expr.clone()) + } + _ => None, + }) + .unwrap_or(Expr::Column(column)) + } + other => other, + } +} + impl SqlToRel<'_, S> { /// Generate a logic plan from an SQL select pub(super) fn select_to_plan( @@ -288,6 +322,8 @@ impl SqlToRel<'_, S> { let group_by_expr = resolve_positions_to_exprs(group_by_expr, &select_exprs)?; let group_by_expr = normalize_col(group_by_expr, &projected_plan)?; + let group_by_expr = + unalias_internal_merged_key(group_by_expr, &select_exprs); self.validate_schema_satisfies_exprs( base_plan.schema(), std::slice::from_ref(&group_by_expr), diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index d63f0556f7a8d..4b663f66e7b44 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -1074,7 +1074,7 @@ fn join_with_ambiguous_column() { assert_snapshot!( plan, @r" - Projection: a.id + Projection: a.id AS id Inner Join: Using a.id = b.id SubqueryAlias: a TableScan: person @@ -1091,7 +1091,7 @@ fn natural_left_join() { assert_snapshot!( plan, @r" - Projection: a.l_item_id + Projection: a.l_item_id AS l_item_id Left Join: Using a.l_orderkey = b.l_orderkey, a.l_item_id = b.l_item_id, a.l_description = b.l_description, a.l_extendedprice = b.l_extendedprice, a.price = b.price SubqueryAlias: a TableScan: lineitem @@ -1108,7 +1108,7 @@ fn natural_right_join() { assert_snapshot!( plan, @r" - Projection: b.l_item_id + Projection: b.l_item_id AS l_item_id Right Join: Using a.l_orderkey = b.l_orderkey, a.l_item_id = b.l_item_id, a.l_description = b.l_description, a.l_extendedprice = b.l_extendedprice, a.price = b.price SubqueryAlias: a TableScan: lineitem @@ -2551,7 +2551,7 @@ fn join_with_using() { assert_snapshot!( plan, @r" - Projection: person.first_name, person.id + Projection: person.first_name, person.id AS id Inner Join: Using person.id = person2.id TableScan: person SubqueryAlias: person2 @@ -3502,6 +3502,17 @@ fn select_groupby_orderby_aggregate_on_non_selected_column_original_issue() { ); } +#[test] +fn natural_join_group_by_order_by_non_grouped_merged_key() { + let sql = "WITH t1 AS (SELECT 1 AS v1, 2 AS v2) \ + SELECT v1 FROM t1 AS tt1 NATURAL JOIN t1 AS tt2 \ + GROUP BY v1 ORDER BY v2"; + assert_contains!( + error_message(sql), + "Column in ORDER BY must be in GROUP BY or an aggregate function" + ); +} + fn logical_plan(sql: &str) -> Result { logical_plan_with_options(sql, ParserOptions::default()) } diff --git a/datafusion/sqllogictest/test_files/join.slt.part b/datafusion/sqllogictest/test_files/join.slt.part index 242e6c1fd930b..00bea008fc2fc 100644 --- a/datafusion/sqllogictest/test_files/join.slt.part +++ b/datafusion/sqllogictest/test_files/join.slt.part @@ -1167,7 +1167,7 @@ INNER JOIN t0 ON (t0.v1 + t5.v0) > 0 WHERE t0.v1 = t1.v0; ---- logical_plan -01)Projection: t1.v0 AS v0, t1.v1 AS v1, t5.v2, t5.v3, t5.v4, t0.v0, t0.v1 +01)Projection: t1.v0, t1.v1, t5.v2, t5.v3, t5.v4, t0.v0, t0.v1 02)--Inner Join: CAST(t1.v0 AS Float64) = t0.v1 Filter: t0.v1 + CAST(t5.v0 AS Float64) > Float64(0) 03)----Projection: t1.v0, t1.v1, t5.v0, t5.v2, t5.v3, t5.v4 04)------Inner Join: t1.v0 = t5.v0, t1.v1 = t5.v1 diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index 2203239a9e9d7..1101ad6d2b14d 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -4542,7 +4542,7 @@ query TT explain SELECT * FROM person a join person b using (id, age); ---- logical_plan -01)Projection: a.id AS id, a.age AS age, a.state, b.state +01)Projection: a.id, a.age, a.state, b.state 02)--Inner Join: a.id = b.id, a.age = b.age 03)----SubqueryAlias: a 04)------TableScan: person projection=[id, age, state] @@ -4557,7 +4557,7 @@ query TT explain SELECT age FROM (SELECT * FROM person a join person b using (id, age, state)); ---- logical_plan -01)Projection: a.age AS age +01)Projection: a.age 02)--Inner Join: a.id = b.id, a.age = b.age, a.state = b.state 03)----SubqueryAlias: a 04)------TableScan: person projection=[id, age, state] @@ -4601,7 +4601,7 @@ query TT explain SELECT * FROM person a join person b using (id, age, state) join person c using (id, age, state); ---- logical_plan -01)Projection: a.id AS id, a.age AS age, a.state AS state +01)Projection: a.id, a.age, a.state 02)--Inner Join: a.id = c.id, a.age = c.age, a.state = c.state 03)----Projection: a.id, a.age, a.state 04)------Inner Join: a.id = b.id, a.age = b.age, a.state = b.state @@ -4636,7 +4636,7 @@ query TT explain SELECT * FROM lineitem JOIN lineitem as lineitem2 USING (c1) ---- logical_plan -01)Projection: lineitem.c1 AS c1 +01)Projection: lineitem.c1 02)--Inner Join: lineitem.c1 = lineitem2.c1 03)----TableScan: lineitem projection=[c1] 04)----SubqueryAlias: lineitem2