diff --git a/datafusion/expr/src/expr_rewriter/mod.rs b/datafusion/expr/src/expr_rewriter/mod.rs index a9a0c156538f9..d1d47078ae5e8 100644 --- a/datafusion/expr/src/expr_rewriter/mod.rs +++ b/datafusion/expr/src/expr_rewriter/mod.rs @@ -22,6 +22,7 @@ use std::collections::HashSet; use std::fmt::Debug; use std::sync::Arc; +use crate::conditional_expressions::CaseBuilder; use crate::expr::{Alias, Sort, Unnest}; use crate::logical_plan::Projection; use crate::{Expr, ExprSchemable, LogicalPlan, LogicalPlanBuilder}; @@ -29,7 +30,7 @@ use crate::{Expr, ExprSchemable, LogicalPlan, LogicalPlanBuilder}; use datafusion_common::TableReference; use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; -use datafusion_common::{Column, DFSchema, Result}; +use datafusion_common::{Column, DFSchema, JoinType, Result}; mod guarantees; pub use guarantees::GuaranteeRewriter; @@ -66,12 +67,22 @@ pub trait FunctionRewrite: Debug { /// Recursively call `LogicalPlanBuilder::normalize` on all [`Column`] expressions /// in the `expr` expression tree. +/// +/// An unqualified reference to the merged key of a `USING` / `NATURAL` join is +/// resolved to a reserved internal merged-key expression via +/// [`merged_using_key_or_column`]. pub fn normalize_col(expr: Expr, plan: &LogicalPlan) -> Result { + let merged_keys = plan.using_key_pairs()?; expr.transform(|expr| { Ok({ if let Expr::Column(c) = expr { + let was_unqualified = c.relation.is_none(); let col = LogicalPlanBuilder::normalize(plan, c)?; - Transformed::yes(Expr::Column(col)) + Transformed::yes(merged_using_key_or_column( + col, + was_unqualified, + &merged_keys, + )?) } else { Transformed::no(expr) } @@ -80,6 +91,56 @@ pub fn normalize_col(expr: Expr, plan: &LogicalPlan) -> Result { .data() } +fn alias_merged_key(expr: Expr, name: &str) -> Expr { + expr.alias_qualified( + Some(TableReference::bare( + LogicalPlanBuilder::MERGED_KEY_QUALIFIER, + )), + name, + ) +} + +/// Resolve a normalized column to itself, or -- when it is the merged key of a +/// `USING` / `NATURAL` join referenced *unqualified* -- to the merged key +/// expression with a reserved internal qualifier. +/// +/// `merged_keys` carries the `(left, right, join_type)` triples of the plan's +/// USING / NATURAL joins. The merged key resolves to the left key for +/// left-preserving joins, the right key for right-preserving joins, and +/// `COALESCE(left, right)` for `FULL`. The COALESCE is built as +/// `CASE WHEN left IS NOT NULL THEN left ELSE right END` (the form `coalesce` +/// is simplified to, buildable here without depending on the functions crate). +pub fn merged_using_key_or_column( + col: Column, + was_unqualified: bool, + merged_keys: &[(Column, Column, JoinType)], +) -> Result { + if was_unqualified + && let Some((l, r, join_type)) = + merged_keys.iter().find(|(l, r, _)| l == &col || r == &col) + { + let name = col.name.clone(); + let expr = match join_type { + // The left key is NULL-padded on right-only rows; the right key is + // always present, so the merged key is just the right column. + JoinType::Right | JoinType::RightSemi | JoinType::RightAnti => { + Expr::Column(r.clone()) + } + // Either side may be NULL-padded; coalesce to the present one. + JoinType::Full => CaseBuilder::new( + None, + vec![Expr::Column(l.clone()).is_not_null()], + vec![Expr::Column(l.clone())], + Some(Box::new(Expr::Column(r.clone()))), + ) + .end()?, + _ => Expr::Column(l.clone()), + }; + return Ok(alias_merged_key(expr, &name)); + } + Ok(Expr::Column(col)) +} + /// See [`Column::normalize_with_schemas_and_ambiguity_check`] for usage pub fn normalize_col_with_schemas_and_ambiguity_check( expr: Expr, @@ -99,9 +160,9 @@ pub fn normalize_col_with_schemas_and_ambiguity_check( expr.transform(|expr| { Ok({ if let Expr::Column(c) = expr { - let col = - c.normalize_with_schemas_and_ambiguity_check(schemas, using_columns)?; - Transformed::yes(Expr::Column(col)) + Transformed::yes(Expr::Column( + c.normalize_with_schemas_and_ambiguity_check(schemas, using_columns)?, + )) } else { Transformed::no(expr) } diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 2ecb12c30afad..25e84fcde59d7 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -130,6 +130,10 @@ pub struct LogicalPlanBuilder { } impl LogicalPlanBuilder { + /// Reserved internal qualifier for USING / NATURAL join merged keys. + /// A real relation carrying this qualifier would clash. + pub const MERGED_KEY_QUALIFIER: &str = "__datafusion_merged_key_qualifier__"; + /// Create a builder from an existing plan pub fn new(plan: LogicalPlan) -> Self { Self { @@ -855,12 +859,10 @@ impl LogicalPlanBuilder { // remove pushed down sort columns let new_expr = schema.columns().into_iter().map(Expr::Column).collect(); + let input = Arc::unwrap_or_clone(self.plan); + let is_distinct = false; - let plan = Self::add_missing_columns( - Arc::unwrap_or_clone(self.plan), - &missing_cols, - is_distinct, - )?; + let plan = Self::add_missing_columns(input, &missing_cols, is_distinct)?; let sort_plan = LogicalPlan::Sort(Sort { expr: normalize_sorts(sorts, &plan)?, diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index c154bc7c92fa5..605d85000cbd2 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -525,6 +525,35 @@ impl LogicalPlan { Ok(using_columns) } + /// Returns the `(left, right, join_type)` join-key column triples of every + /// `USING` / `NATURAL` join in this plan. + pub fn using_key_pairs( + &self, + ) -> Result, DataFusionError> { + let mut triples = vec![]; + + self.apply_with_subqueries(|plan| { + if let LogicalPlan::Join(Join { + join_constraint: JoinConstraint::Using, + join_type, + on, + .. + }) = plan + { + for (l, r) in on { + if let (Some(l), Some(r)) = + (l.get_as_join_column(), r.get_as_join_column()) + { + triples.push((l.to_owned(), r.to_owned(), *join_type)); + } + } + } + Ok(TreeNodeRecursion::Continue) + })?; + + Ok(triples) + } + /// returns the first output expression of this `LogicalPlan` node. pub fn head_output_expr(&self) -> Result> { match self { diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 22abb454d4e6b..f22b53ea0adb8 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -21,8 +21,10 @@ use std::cmp::Ordering; use std::collections::{BTreeSet, HashSet}; use std::sync::Arc; +use crate::conditional_expressions::CaseBuilder; use crate::expr::{Alias, Sort, WildcardOptions, WindowFunctionParams}; use crate::expr_rewriter::strip_outer_reference; +use crate::logical_plan::JoinType; use crate::{ BinaryExpr, Expr, ExprSchemable, Filter, GroupingSet, LogicalPlan, Operator, and, }; @@ -442,6 +444,38 @@ fn exclude_using_columns(plan: &LogicalPlan) -> Result> { Ok(excluded) } +fn merged_using_key_for_wildcard( + col: Column, + merged_keys: &[(Column, Column, JoinType)], +) -> Result { + let Some((l, r, join_type)) = + merged_keys.iter().find(|(l, r, _)| l == &col || r == &col) + else { + return Ok(Expr::Column(col)); + }; + + let (expr, visible_col) = match join_type { + JoinType::Right | JoinType::RightSemi | JoinType::RightAnti => { + (Expr::Column(r.clone()), r.clone()) + } + JoinType::Full => CaseBuilder::new( + None, + vec![Expr::Column(l.clone()).is_not_null()], + vec![Expr::Column(l.clone())], + Some(Box::new(Expr::Column(r.clone()))), + ) + .end() + .map(|expr| (expr, col.clone()))?, + _ => (Expr::Column(l.clone()), l.clone()), + }; + + if expr == Expr::Column(visible_col.clone()) { + Ok(expr) + } else { + Ok(expr.alias_qualified(visible_col.relation, visible_col.name)) + } +} + /// Resolves an `Expr::Wildcard` to a collection of `Expr::Column`'s. pub fn expand_wildcard( schema: &DFSchema, @@ -461,7 +495,21 @@ pub fn expand_wildcard( }; // Add each excluded `Column` to columns_to_skip columns_to_skip.extend(excluded_columns); - Ok(get_exprs_except_skipped(schema, &columns_to_skip)) + let exprs = get_exprs_except_skipped(schema, &columns_to_skip); + + // Resolve the surviving USING / NATURAL key column to the merged key value, + // while preserving the wildcard-visible field qualifier. + let merged_keys = plan.using_key_pairs()?; + if merged_keys.is_empty() { + return Ok(exprs); + } + exprs + .into_iter() + .map(|expr| match expr { + Expr::Column(col) => merged_using_key_for_wildcard(col, &merged_keys), + other => Ok(other), + }) + .collect() } /// Resolves an `Expr::Wildcard` to a collection of qualified `Expr::Column`'s. diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index acdbf71d05d5c..d900037eebf15 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -1952,7 +1952,7 @@ mod tests { assert_snapshot!( optimized_plan.clone(), @r" - Projection: test.a, test.b + Projection: test.a AS a, test.b Left Join: Using test.a = test2.a TableScan: test projection=[a, b] TableScan: test2 projection=[a] diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index ba7353c424f4e..a76efde9b0112 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -31,18 +31,22 @@ use crate::utils::{ use arrow::datatypes::DataType; use datafusion_common::error::DataFusionErrorBuilder; -use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; -use datafusion_common::{Column, DFSchema, DFSchemaRef, Result, not_impl_err, plan_err}; +use datafusion_common::tree_node::{ + Transformed, TransformedResult, TreeNode, TreeNodeRecursion, +}; +use datafusion_common::{ + Column, DFSchema, DFSchemaRef, JoinType, Result, not_impl_err, plan_err, +}; use datafusion_common::{RecursionUnnestOption, UnnestOptions}; use datafusion_expr::ExprSchemable; use datafusion_expr::builder::get_struct_unnested_columns; -use datafusion_expr::expr::{PlannedReplaceSelectItem, WildcardOptions}; +use datafusion_expr::expr::{Alias, PlannedReplaceSelectItem, Unnest, WildcardOptions}; use datafusion_expr::expr_rewriter::{ - normalize_col, normalize_col_with_schemas_and_ambiguity_check, normalize_sorts, + merged_using_key_or_column, normalize_col, normalize_sorts, }; use datafusion_expr::select_expr::SelectExpr; use datafusion_expr::utils::{ - expr_as_column_expr, expr_to_columns, find_aggregate_exprs, find_window_exprs, + expr_as_column_expr, find_aggregate_exprs, find_window_exprs, }; use datafusion_expr::{ Aggregate, Expr, Filter, GroupingSet, LogicalPlan, LogicalPlanBuilder, @@ -90,6 +94,81 @@ fn flatten_expr_groups(expr_groups: Vec>) -> Vec { expr_groups.into_iter().flatten().collect() } +/// Normalize unqualified column references in `expr` against `schemas`, then +/// resolve any USING / NATURAL merged key to its internally qualified merged-key +/// expression (see [`merged_using_key_or_column`]). +/// +/// This is a single pass on purpose: only references written *unqualified* are +/// merged keys, and that distinction is lost once normalization has qualified +/// them, so the merged-key resolution has to happen as each column is resolved. +fn normalize_col_resolving_merged_using_key( + expr: Expr, + schemas: &[&[&DFSchema]], + using_columns: &[HashSet], + merged_keys: &[(Column, Column, JoinType)], +) -> Result { + // Normalize column inside Unnest + if let Expr::Unnest(Unnest { expr }) = expr { + let e = normalize_col_resolving_merged_using_key( + expr.as_ref().clone(), + schemas, + using_columns, + merged_keys, + )?; + return Ok(Expr::Unnest(Unnest { expr: Box::new(e) })); + } + + expr.transform(|expr| { + Ok(if let Expr::Column(c) = expr { + let was_unqualified = c.relation.is_none(); + let col = + c.normalize_with_schemas_and_ambiguity_check(schemas, using_columns)?; + Transformed::yes(merged_using_key_or_column( + col, + was_unqualified, + merged_keys, + )?) + } else { + Transformed::no(expr) + }) + }) + .data() +} + +fn unalias_internal_merged_key(expr: Expr, select_exprs: &[Expr]) -> Expr { + match expr { + Expr::Alias(Alias { + expr, + relation: Some(relation), + .. + }) if relation.table() == LogicalPlanBuilder::MERGED_KEY_QUALIFIER => *expr, + Expr::Column(column) + if column.relation.as_ref().is_some_and(|relation| { + relation.table() == LogicalPlanBuilder::MERGED_KEY_QUALIFIER + }) => + { + select_exprs + .iter() + .find_map(|select_expr| match select_expr { + Expr::Alias(Alias { + expr, + relation: Some(relation), + name, + .. + }) if relation.table() + == LogicalPlanBuilder::MERGED_KEY_QUALIFIER + && name == &column.name => + { + Some(*expr.clone()) + } + _ => None, + }) + .unwrap_or(Expr::Column(column)) + } + other => other, + } +} + impl SqlToRel<'_, S> { /// Generate a logic plan from an SQL select pub(super) fn select_to_plan( @@ -243,6 +322,8 @@ impl SqlToRel<'_, S> { let group_by_expr = resolve_positions_to_exprs(group_by_expr, &select_exprs)?; let group_by_expr = normalize_col(group_by_expr, &projected_plan)?; + let group_by_expr = + unalias_internal_merged_key(group_by_expr, &select_exprs); self.validate_schema_satisfies_exprs( base_plan.schema(), std::slice::from_ref(&group_by_expr), @@ -879,22 +960,26 @@ impl SqlToRel<'_, S> { ); } - let mut using_columns = HashSet::new(); - expr_to_columns(&filter_expr, &mut using_columns)?; + // Resolve unqualified references against the real USING / NATURAL + // join columns so the merged key is recognized rather than + // reported as ambiguous. + let using_columns = plan.using_columns()?; + let merged_keys = plan.using_key_pairs()?; let mut schema_stack: Vec> = vec![vec![plan.schema()], fallback_schemas]; for sc in planner_context.outer_schemas_iter() { schema_stack.push(vec![sc.as_ref()]); } - let filter_expr = normalize_col_with_schemas_and_ambiguity_check( + let filter_expr = normalize_col_resolving_merged_using_key( filter_expr, schema_stack .iter() .map(|sc| sc.as_slice()) .collect::>() .as_slice(), - &[using_columns], + &using_columns, + &merged_keys, )?; Ok(LogicalPlan::Filter(Filter::try_new( @@ -952,7 +1037,6 @@ impl SqlToRel<'_, S> { ) -> Result> { let mut prepared_select_exprs = vec![]; let mut error_builder = DataFusionErrorBuilder::new(); - for expr in projection { match self.sql_select_to_rex(expr, plan, empty_from, planner_context) { Ok(expr) => prepared_select_exprs.push(expr), @@ -973,10 +1057,11 @@ impl SqlToRel<'_, S> { match sql { SelectItem::UnnamedExpr(expr) => { let expr = self.sql_to_expr(expr, plan.schema(), planner_context)?; - let col = normalize_col_with_schemas_and_ambiguity_check( + let col = normalize_col_resolving_merged_using_key( expr, &[&[plan.schema()]], &plan.using_columns()?, + &plan.using_key_pairs()?, )?; Ok(SelectExpr::Expression(col)) @@ -984,10 +1069,11 @@ impl SqlToRel<'_, S> { SelectItem::ExprWithAlias { expr, alias } => { let select_expr = self.sql_to_expr(expr, plan.schema(), planner_context)?; - let col = normalize_col_with_schemas_and_ambiguity_check( + let col = normalize_col_resolving_merged_using_key( select_expr, &[&[plan.schema()]], &plan.using_columns()?, + &plan.using_key_pairs()?, )?; let name = self.ident_normalizer.normalize(alias); // avoiding adding an alias if the column name is the same. diff --git a/datafusion/sql/src/unparser/plan.rs b/datafusion/sql/src/unparser/plan.rs index 37ff8145d3fd2..822619acd8774 100644 --- a/datafusion/sql/src/unparser/plan.rs +++ b/datafusion/sql/src/unparser/plan.rs @@ -2151,6 +2151,15 @@ impl Unparser<'_> { fn select_item_to_sql(&self, expr: &Expr) -> Result { match expr { + Expr::Alias(Alias { + relation: Some(relation), + name, + .. + }) if relation.table() == LogicalPlanBuilder::MERGED_KEY_QUALIFIER => { + Ok(ast::SelectItem::UnnamedExpr(ast::Expr::Identifier( + self.new_ident_quoted_if_needs(name.to_string()), + ))) + } Expr::Alias(Alias { expr, name, .. }) => { let inner = self.expr_to_sql(expr)?; diff --git a/datafusion/sql/src/unparser/rewrite.rs b/datafusion/sql/src/unparser/rewrite.rs index 6ee66f61938f0..5c23828143b80 100644 --- a/datafusion/sql/src/unparser/rewrite.rs +++ b/datafusion/sql/src/unparser/rewrite.rs @@ -201,7 +201,8 @@ pub(super) fn rewrite_plan_for_sort_on_non_projected_fields( .enumerate() .map(|(i, f)| match f { Expr::Alias(alias) => { - let a = Expr::Column(alias.name.clone().into()); + let a = + Expr::Column(Column::new(alias.relation.clone(), alias.name.clone())); map.insert(a.clone(), f.clone()); a } @@ -220,7 +221,20 @@ pub(super) fn rewrite_plan_for_sort_on_non_projected_fields( }) .collect::>(); - let mut collects = p.expr.clone(); + let mut collects = p + .expr + .iter() + .map(|e| match e { + // A pure rename `Column(x) AS out` compares as its underlying + // column `x`, so an outer Projection that only renames an inner + // column (e.g. the restored merged key of a FULL USING / NATURAL + // join) still matches and can be flattened instead of nested. + Expr::Alias(alias) if matches!(alias.expr.as_ref(), Expr::Column(_)) => { + alias.expr.as_ref().clone() + } + _ => e.clone(), + }) + .collect::>(); for sort in &sort.expr { // Strip aliases from sort expressions so the comparison matches // the inner Projection's raw expressions. The optimizer may add @@ -250,7 +264,19 @@ pub(super) fn rewrite_plan_for_sort_on_non_projected_fields( let new_exprs = p .expr .iter() - .map(|e| map.get(e).unwrap_or(e).clone()) + .map(|e| { + // For a pure rename `Column(x) AS out`, inline the inner + // definition of `x` and re-apply the outer output name, so the + // flattened SELECT keeps `out` as the column name. + if let Expr::Alias(alias) = e + && matches!(alias.expr.as_ref(), Expr::Column(_)) + && let Some(inner) = map.get(alias.expr.as_ref()) + { + inner.clone().unalias().alias(alias.name.clone()) + } else { + map.get(e).unwrap_or(e).clone() + } + }) .collect::>(); // The inner Projection may define aliases that the Sort references diff --git a/datafusion/sql/tests/cases/plan_to_sql.rs b/datafusion/sql/tests/cases/plan_to_sql.rs index 937ed9894fdfa..0ff002e2f4439 100644 --- a/datafusion/sql/tests/cases/plan_to_sql.rs +++ b/datafusion/sql/tests/cases/plan_to_sql.rs @@ -46,6 +46,7 @@ use datafusion_sql::unparser::dialect::{ }; use datafusion_sql::unparser::{Unparser, expr_to_sql, plan_to_sql}; use insta::assert_snapshot; +use rstest::rstest; use sqlparser::ast::Statement; use std::hash::Hash; use std::ops::Add; @@ -260,6 +261,60 @@ fn roundtrip_statement() -> Result<()> { Ok(()) } +/// Regression for https://github.com/apache/datafusion/issues/22881. +/// +/// A `USING` / `NATURAL` merged key is exposed as an unqualified column, while +/// the original side keys remain addressable through qualified references. The +/// merged key and qualified side keys may appear together because the user +/// selected them, or because `ORDER BY` folds a non-projected side key into the +/// plan. Both shapes must survive an unparser round-trip (`plan -> SQL -> plan`). +#[rstest] +#[case::order_by_qualified_side_key_using( + "SELECT id FROM (SELECT j1_id AS id FROM j1) ta \ + FULL JOIN (SELECT j2_id AS id FROM j2) tb USING (id) \ + ORDER BY ta.id" +)] +#[case::order_by_qualified_side_key_natural( + "SELECT id FROM (SELECT j1_id AS id FROM j1) ta \ + NATURAL FULL JOIN (SELECT j2_id AS id FROM j2) tb \ + ORDER BY ta.id" +)] +#[case::select_merged_and_left_side_key( + "SELECT id, ta.id FROM (SELECT j1_id AS id FROM j1) ta \ + LEFT JOIN (SELECT j2_id AS id FROM j2) tb USING (id)" +)] +#[case::select_merged_and_right_side_key( + "SELECT id, tb.id FROM (SELECT j1_id AS id FROM j1) ta \ + RIGHT JOIN (SELECT j2_id AS id FROM j2) tb USING (id)" +)] +#[case::select_merged_and_both_side_keys( + "SELECT id, ta.id, tb.id FROM (SELECT j1_id AS id FROM j1) ta \ + FULL JOIN (SELECT j2_id AS id FROM j2) tb USING (id)" +)] +fn roundtrip_join_using_merged_key_with_qualified_side_keys( + #[case] query: &str, +) -> Result<()> { + let dialect = GenericDialect {}; + let statement = Parser::new(&dialect) + .try_with_sql(query)? + .parse_statement()?; + + let context = MockContextProvider { + state: MockSessionState::default(), + }; + let sql_to_rel = SqlToRel::new(&context); + let plan = sql_to_rel.sql_statement_to_plan(statement).unwrap(); + + let roundtrip_statement = plan_to_sql(&plan)?; + let plan_roundtrip = sql_to_rel + .sql_statement_to_plan(roundtrip_statement) + .unwrap(); + + assert_eq!(plan, plan_roundtrip); + + Ok(()) +} + #[test] fn roundtrip_crossjoin() -> Result<()> { let query = "select j1.j1_id, j2.j2_string from j1, j2"; diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index 88b7b43eb73f6..4b663f66e7b44 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -1074,7 +1074,7 @@ fn join_with_ambiguous_column() { assert_snapshot!( plan, @r" - Projection: a.id + Projection: a.id AS id Inner Join: Using a.id = b.id SubqueryAlias: a TableScan: person @@ -1091,7 +1091,7 @@ fn natural_left_join() { assert_snapshot!( plan, @r" - Projection: a.l_item_id + Projection: a.l_item_id AS l_item_id Left Join: Using a.l_orderkey = b.l_orderkey, a.l_item_id = b.l_item_id, a.l_description = b.l_description, a.l_extendedprice = b.l_extendedprice, a.price = b.price SubqueryAlias: a TableScan: lineitem @@ -1108,7 +1108,7 @@ fn natural_right_join() { assert_snapshot!( plan, @r" - Projection: a.l_item_id + Projection: b.l_item_id AS l_item_id Right Join: Using a.l_orderkey = b.l_orderkey, a.l_item_id = b.l_item_id, a.l_description = b.l_description, a.l_extendedprice = b.l_extendedprice, a.price = b.price SubqueryAlias: a TableScan: lineitem @@ -2551,7 +2551,7 @@ fn join_with_using() { assert_snapshot!( plan, @r" - Projection: person.first_name, person.id + Projection: person.first_name, person.id AS id Inner Join: Using person.id = person2.id TableScan: person SubqueryAlias: person2 @@ -3502,6 +3502,17 @@ fn select_groupby_orderby_aggregate_on_non_selected_column_original_issue() { ); } +#[test] +fn natural_join_group_by_order_by_non_grouped_merged_key() { + let sql = "WITH t1 AS (SELECT 1 AS v1, 2 AS v2) \ + SELECT v1 FROM t1 AS tt1 NATURAL JOIN t1 AS tt2 \ + GROUP BY v1 ORDER BY v2"; + assert_contains!( + error_message(sql), + "Column in ORDER BY must be in GROUP BY or an aggregate function" + ); +} + fn logical_plan(sql: &str) -> Result { logical_plan_with_options(sql, ParserOptions::default()) } @@ -5201,7 +5212,8 @@ fn test_using_join_wildcard_schema() { [ "t1.a".to_string(), "t1.b".to_string(), - "t2.c".to_string(), + // RIGHT join: the merged key `c` is the never-NULL-padded right side + "t3.c".to_string(), "t3.d".to_string() ] ); diff --git a/datafusion/sqllogictest/test_files/join_using_merged_key.slt b/datafusion/sqllogictest/test_files/join_using_merged_key.slt new file mode 100644 index 0000000000000..98df116cf26a3 --- /dev/null +++ b/datafusion/sqllogictest/test_files/join_using_merged_key.slt @@ -0,0 +1,234 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Merged key of USING / NATURAL joins. +# +# The single merged key column exposed by a USING / NATURAL join equals +# COALESCE(left.k, right.k): for an outer join a row present on only one side +# must expose the key from the present side, not NULL. This matters for +# RIGHT / FULL joins, whose left key is NULL-padded on right-only rows. +# +# Regression coverage for https://github.com/apache/datafusion/issues/22881, +# where the merged key was resolved to the left column unconditionally and so +# came out NULL for right-only rows of RIGHT / FULL joins. +# +# Tables: a has keys {1,2,3}, b has keys {2,3,4}. +# matched: 2, 3 ; left-only: 1 ; right-only: 4 + +statement ok +create table a(k int, x int) as values (1, 10), (2, 20), (3, 30); + +statement ok +create table b(k int, y int) as values (2, 200), (3, 300), (4, 400); + +########## +# Already correct: INNER / LEFT (left key is never NULL-padded) +########## + +query I nosort +SELECT k FROM a INNER JOIN b USING (k) ORDER BY k +---- +2 +3 + +query I nosort +SELECT k FROM a LEFT JOIN b USING (k) ORDER BY k +---- +1 +2 +3 + +########## +# The bug: RIGHT / FULL merged key must come from the preserved side +########## + +query I nosort +SELECT k FROM a RIGHT JOIN b USING (k) ORDER BY k NULLS LAST +---- +2 +3 +4 + +query I nosort +SELECT k FROM a FULL JOIN b USING (k) ORDER BY k NULLS LAST +---- +1 +2 +3 +4 + +query I nosort +SELECT k FROM a NATURAL RIGHT JOIN b ORDER BY k NULLS LAST +---- +2 +3 +4 + +query I nosort +SELECT k FROM a NATURAL FULL JOIN b ORDER BY k NULLS LAST +---- +1 +2 +3 +4 + +########## +# Downstream of the merged key: WHERE must see the coalesced value, not the +# wrong NULL (ORDER BY is covered separately below) +########## + +# right-only row (k = 4) must be findable by its merged key +query III nosort +SELECT k, x, y FROM a FULL JOIN b USING (k) WHERE k = 4 +---- +4 NULL 400 + +########## +# Guards / reference: these are already correct and must stay correct +########## + +# qualified access to each side is independent of the merged key +query II nosort +SELECT a.k, b.k FROM a FULL JOIN b USING (k) ORDER BY coalesce(a.k, b.k) +---- +1 NULL +2 2 +3 3 +NULL 4 + +# the merged key is distinct from an explicitly qualified side key +query II rowsort +SELECT k, a.k FROM a LEFT JOIN b USING (k) +---- +1 1 +2 2 +3 3 + +query II rowsort +SELECT k, b.k FROM a RIGHT JOIN b USING (k) +---- +2 2 +3 3 +4 4 + +query III rowsort +SELECT k, a.k, b.k FROM a FULL JOIN b USING (k) +---- +1 1 NULL +2 2 2 +3 3 3 +4 NULL 4 + +# the explicit form is the reference for what the merged key should equal +query I nosort +SELECT coalesce(a.k, b.k) AS k FROM a FULL JOIN b ON a.k = b.k ORDER BY k +---- +1 +2 +3 +4 + +########## +# Regression: ORDER BY a *qualified* key while selecting the FULL merged key. +# The merged k is COALESCE(a.k, b.k), exposed unqualified; ORDER BY a.k folds +# the qualified a.k into the same projection. This must plan (it regressed to a +# schema-ambiguity error) and sort by a.k, not by the merged key: the right-only +# row (a.k IS NULL) sorts last, the rest descend by a.k -- so the output differs +# from ORDER BY k DESC (which would be 4, 3, 2, 1). +########## + +query I nosort +SELECT k FROM a FULL JOIN b USING (k) ORDER BY a.k DESC NULLS LAST +---- +3 +2 +1 +4 + +query I nosort +SELECT k FROM a NATURAL FULL JOIN b ORDER BY a.k DESC NULLS LAST +---- +3 +2 +1 +4 + +########## +# Regression: a *qualified* key and the *unqualified* merged key together in one +# ORDER BY (`ORDER BY a.k DESC NULLS LAST, k`). On main this was legal (the merged +# key resolved to a.k, so both sort keys were a.k); the merged-key fix must keep +# it legal -- renaming the merged key to dodge the `{k, a.k}` collision must also +# rewrite the unqualified `k` reference in the sort. a.k is unique here so the +# secondary `k` never breaks a tie, but the shape still exercises that path. +########## + +query I nosort +SELECT k FROM a FULL JOIN b USING (k) ORDER BY a.k DESC NULLS LAST, k +---- +3 +2 +1 +4 + +########## +# Wildcard over the join: the merged key must appear once and carry the right +# value. (commit 2 target -- the materialized merged column must dedupe under *) +########## + +query III nosort +SELECT * FROM a RIGHT JOIN b USING (k) ORDER BY k NULLS LAST +---- +2 20 200 +3 30 300 +4 NULL 400 + +query III nosort +SELECT * FROM a FULL JOIN b USING (k) ORDER BY k NULLS LAST +---- +1 10 NULL +2 20 200 +3 30 300 +4 NULL 400 + +statement ok +drop table a + +statement ok +drop table b + +########## +# ORDER BY on the merged key: a right-only row with a small key must sort by +# its real value, not as a NULL +########## + +statement ok +create table c(k int, x int) as values (5, 50); + +statement ok +create table d(k int, y int) as values (0, 0), (5, 500); + +query II nosort +SELECT x, k FROM c RIGHT JOIN d USING (k) ORDER BY k +---- +NULL 0 +50 5 + +statement ok +drop table c + +statement ok +drop table d