Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 66 additions & 5 deletions datafusion/expr/src/expr_rewriter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@ use std::collections::HashSet;
use std::fmt::Debug;
use std::sync::Arc;

use crate::conditional_expressions::CaseBuilder;
use crate::expr::{Alias, Sort, Unnest};
use crate::logical_plan::Projection;
use crate::{Expr, ExprSchemable, LogicalPlan, LogicalPlanBuilder};

use datafusion_common::TableReference;
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::{Column, DFSchema, Result};
use datafusion_common::{Column, DFSchema, JoinType, Result};

mod guarantees;
pub use guarantees::GuaranteeRewriter;
Expand Down Expand Up @@ -66,12 +67,22 @@ pub trait FunctionRewrite: Debug {

/// Recursively call `LogicalPlanBuilder::normalize` on all [`Column`] expressions
/// in the `expr` expression tree.
///
/// An unqualified reference to the merged key of a `USING` / `NATURAL` join is
/// resolved to a reserved internal merged-key expression via
/// [`merged_using_key_or_column`].
pub fn normalize_col(expr: Expr, plan: &LogicalPlan) -> Result<Expr> {
let merged_keys = plan.using_key_pairs()?;
expr.transform(|expr| {
Ok({
if let Expr::Column(c) = expr {
let was_unqualified = c.relation.is_none();
let col = LogicalPlanBuilder::normalize(plan, c)?;
Transformed::yes(Expr::Column(col))
Transformed::yes(merged_using_key_or_column(
col,
was_unqualified,
&merged_keys,
)?)
} else {
Transformed::no(expr)
}
Expand All @@ -80,6 +91,56 @@ pub fn normalize_col(expr: Expr, plan: &LogicalPlan) -> Result<Expr> {
.data()
}

fn alias_merged_key(expr: Expr, name: &str) -> Expr {
expr.alias_qualified(
Some(TableReference::bare(
LogicalPlanBuilder::MERGED_KEY_QUALIFIER,
)),
name,
)
}

/// Resolve a normalized column to itself, or -- when it is the merged key of a
/// `USING` / `NATURAL` join referenced *unqualified* -- to the merged key
/// expression with a reserved internal qualifier.
///
/// `merged_keys` carries the `(left, right, join_type)` triples of the plan's
/// USING / NATURAL joins. The merged key resolves to the left key for
/// left-preserving joins, the right key for right-preserving joins, and
/// `COALESCE(left, right)` for `FULL`. The COALESCE is built as
/// `CASE WHEN left IS NOT NULL THEN left ELSE right END` (the form `coalesce`
/// is simplified to, buildable here without depending on the functions crate).
pub fn merged_using_key_or_column(
col: Column,
was_unqualified: bool,
merged_keys: &[(Column, Column, JoinType)],
) -> Result<Expr> {
if was_unqualified
&& let Some((l, r, join_type)) =
merged_keys.iter().find(|(l, r, _)| l == &col || r == &col)
{
let name = col.name.clone();
let expr = match join_type {
// The left key is NULL-padded on right-only rows; the right key is
// always present, so the merged key is just the right column.
JoinType::Right | JoinType::RightSemi | JoinType::RightAnti => {
Expr::Column(r.clone())
}
// Either side may be NULL-padded; coalesce to the present one.
JoinType::Full => CaseBuilder::new(
None,
vec![Expr::Column(l.clone()).is_not_null()],
vec![Expr::Column(l.clone())],
Some(Box::new(Expr::Column(r.clone()))),
)
.end()?,
_ => Expr::Column(l.clone()),
};
return Ok(alias_merged_key(expr, &name));
}
Ok(Expr::Column(col))
}

/// See [`Column::normalize_with_schemas_and_ambiguity_check`] for usage
pub fn normalize_col_with_schemas_and_ambiguity_check(
expr: Expr,
Expand All @@ -99,9 +160,9 @@ pub fn normalize_col_with_schemas_and_ambiguity_check(
expr.transform(|expr| {
Ok({
if let Expr::Column(c) = expr {
let col =
c.normalize_with_schemas_and_ambiguity_check(schemas, using_columns)?;
Transformed::yes(Expr::Column(col))
Transformed::yes(Expr::Column(
c.normalize_with_schemas_and_ambiguity_check(schemas, using_columns)?,
))
} else {
Transformed::no(expr)
}
Expand Down
12 changes: 7 additions & 5 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ pub struct LogicalPlanBuilder {
}

impl LogicalPlanBuilder {
/// Reserved internal qualifier for USING / NATURAL join merged keys.
/// A real relation carrying this qualifier would clash.
pub const MERGED_KEY_QUALIFIER: &str = "__datafusion_merged_key_qualifier__";

/// Create a builder from an existing plan
pub fn new(plan: LogicalPlan) -> Self {
Self {
Expand Down Expand Up @@ -855,12 +859,10 @@ impl LogicalPlanBuilder {
// remove pushed down sort columns
let new_expr = schema.columns().into_iter().map(Expr::Column).collect();

let input = Arc::unwrap_or_clone(self.plan);

let is_distinct = false;
let plan = Self::add_missing_columns(
Arc::unwrap_or_clone(self.plan),
&missing_cols,
is_distinct,
)?;
let plan = Self::add_missing_columns(input, &missing_cols, is_distinct)?;

let sort_plan = LogicalPlan::Sort(Sort {
expr: normalize_sorts(sorts, &plan)?,
Expand Down
29 changes: 29 additions & 0 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,35 @@ impl LogicalPlan {
Ok(using_columns)
}

/// Returns the `(left, right, join_type)` join-key column triples of every
/// `USING` / `NATURAL` join in this plan.
pub fn using_key_pairs(
&self,
) -> Result<Vec<(Column, Column, JoinType)>, DataFusionError> {
let mut triples = vec![];

self.apply_with_subqueries(|plan| {
if let LogicalPlan::Join(Join {
join_constraint: JoinConstraint::Using,
join_type,
on,
..
}) = plan
{
for (l, r) in on {
if let (Some(l), Some(r)) =
(l.get_as_join_column(), r.get_as_join_column())
{
triples.push((l.to_owned(), r.to_owned(), *join_type));
}
}
}
Ok(TreeNodeRecursion::Continue)
})?;

Ok(triples)
}

/// returns the first output expression of this `LogicalPlan` node.
pub fn head_output_expr(&self) -> Result<Option<Expr>> {
match self {
Expand Down
50 changes: 49 additions & 1 deletion datafusion/expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ use std::cmp::Ordering;
use std::collections::{BTreeSet, HashSet};
use std::sync::Arc;

use crate::conditional_expressions::CaseBuilder;
use crate::expr::{Alias, Sort, WildcardOptions, WindowFunctionParams};
use crate::expr_rewriter::strip_outer_reference;
use crate::logical_plan::JoinType;
use crate::{
BinaryExpr, Expr, ExprSchemable, Filter, GroupingSet, LogicalPlan, Operator, and,
};
Expand Down Expand Up @@ -442,6 +444,38 @@ fn exclude_using_columns(plan: &LogicalPlan) -> Result<HashSet<Column>> {
Ok(excluded)
}

fn merged_using_key_for_wildcard(
col: Column,
merged_keys: &[(Column, Column, JoinType)],
) -> Result<Expr> {
let Some((l, r, join_type)) =
merged_keys.iter().find(|(l, r, _)| l == &col || r == &col)
else {
return Ok(Expr::Column(col));
};

let (expr, visible_col) = match join_type {
JoinType::Right | JoinType::RightSemi | JoinType::RightAnti => {
(Expr::Column(r.clone()), r.clone())
}
JoinType::Full => CaseBuilder::new(
None,
vec![Expr::Column(l.clone()).is_not_null()],
vec![Expr::Column(l.clone())],
Some(Box::new(Expr::Column(r.clone()))),
)
.end()
.map(|expr| (expr, col.clone()))?,
_ => (Expr::Column(l.clone()), l.clone()),
};

if expr == Expr::Column(visible_col.clone()) {
Ok(expr)
} else {
Ok(expr.alias_qualified(visible_col.relation, visible_col.name))
}
}

/// Resolves an `Expr::Wildcard` to a collection of `Expr::Column`'s.
pub fn expand_wildcard(
schema: &DFSchema,
Expand All @@ -461,7 +495,21 @@ pub fn expand_wildcard(
};
// Add each excluded `Column` to columns_to_skip
columns_to_skip.extend(excluded_columns);
Ok(get_exprs_except_skipped(schema, &columns_to_skip))
let exprs = get_exprs_except_skipped(schema, &columns_to_skip);

// Resolve the surviving USING / NATURAL key column to the merged key value,
// while preserving the wildcard-visible field qualifier.
let merged_keys = plan.using_key_pairs()?;
if merged_keys.is_empty() {
return Ok(exprs);
}
exprs
.into_iter()
.map(|expr| match expr {
Expr::Column(col) => merged_using_key_for_wildcard(col, &merged_keys),
other => Ok(other),
})
.collect()
}

/// Resolves an `Expr::Wildcard` to a collection of qualified `Expr::Column`'s.
Expand Down
2 changes: 1 addition & 1 deletion datafusion/optimizer/src/optimize_projections/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1952,7 +1952,7 @@ mod tests {
assert_snapshot!(
optimized_plan.clone(),
@r"
Projection: test.a, test.b
Projection: test.a AS a, test.b
Left Join: Using test.a = test2.a
TableScan: test projection=[a, b]
TableScan: test2 projection=[a]
Expand Down
Loading
Loading