Aggregations Support Partitioning::Range#23239
Conversation
Partitioning::Range
| plan.is::<RepartitionExec>() | ||
| } | ||
|
|
||
| /// Temporary check while `HashPartitioned` is being migrated to `KeyPartitioned` |
There was a problem hiding this comment.
dont know if these are worth to be shared in the crate if we are gong to eventually remove them,
| AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[] | ||
| AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[] | ||
| DataSourceExec: file_groups={4 groups: [[p0], [p1], [p2], [p3]]}, projection=[a, b, c, d, e], output_partitioning=Range([a@0 ASC], [(10), (20), (30)], 4), file_type=parquet |
There was a problem hiding this comment.
🤔 I think we don't need the Partial there right? we should be fine with just a FinalPartitioned aggregation
There was a problem hiding this comment.
this happens in a later optimizer rule that collapses these partial -> finals where approacpriate: datafusion/physical-optimizer/src/combine_partial_final_agg.rs
There was a problem hiding this comment.
that is why it shows up correctly in the slt tests 👍
There was a problem hiding this comment.
Is it possible to move the unit-test coverage added in this file to end-to-end SLT tests instead?
It seems the same test goal can still be achieved at the SLT level, and those sql tests should be more stable across optimizer refactors.
For example, whether the initial physical plan uses a two-stage aggregation or a single-stage aggregation feels implementation-specific. A future refactor might legitimately change that plan shape, which would require updating these unit tests. At that point, it may be harder to recover the original intent of each assertion, and some coverage could accidentally be lost during the refactor. (while SLT behavior won't change a lot even after aggressive refactors)
There was a problem hiding this comment.
Indeed, tweaking the plan like this:
let plan = CombinePartialFinalAggregate::new().optimize(plan, &ConfigOptions::new())?;Produces the following plan:
AggregateExec: mode=SinglePartitioned, gby=[a@0 as a], aggr=[]
DataSourceExec: file_groups={4 groups: [[p0], [p1], [p2], [p3]]}, projection=[a, b, c, d, e], output_partitioning=Range([a@0 ASC], [(10), (20), (30)], 4), file_type=parquetThere was a problem hiding this comment.
I don't have a strong opinion whether porting this to SLT tests or also leaving them here. If there are things that are covered here but not in SLT, it's probably worth also covering them in SLT.
2010YOUY01
left a comment
There was a problem hiding this comment.
Thank you! The high-level shape of the PR LGTM. I just need a bit more time to understand what EnforceDistribution is doing before finishing the review — that code looks a little intimidating 😅
| AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[] | ||
| AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[] | ||
| DataSourceExec: file_groups={4 groups: [[p0], [p1], [p2], [p3]]}, projection=[a, b, c, d, e], output_partitioning=Range([a@0 ASC], [(10), (20), (30)], 4), file_type=parquet |
There was a problem hiding this comment.
Is it possible to move the unit-test coverage added in this file to end-to-end SLT tests instead?
It seems the same test goal can still be achieved at the SLT level, and those sql tests should be more stable across optimizer refactors.
For example, whether the initial physical plan uses a two-stage aggregation or a single-stage aggregation feels implementation-specific. A future refactor might legitimately change that plan shape, which would require updating these unit tests. At that point, it may be harder to recover the original intent of each assertion, and some coverage could accidentally be lost during the refactor. (while SLT behavior won't change a lot even after aggressive refactors)
| .is_satisfied() | ||
| { | ||
| .is_satisfied(); | ||
| let range_satisfies_aggregate_distribution = |
There was a problem hiding this comment.
I feel this PR is already treating HashPartitioned as KeyPartitioned logically, but the formal renaming PR is left to be done in a future PR 🤔
If that's the case, should we directly implement this logic into range_partitioning.satisfaction()? Otherwise we still have to do it after the formal renaming.
There was a problem hiding this comment.
Following on our discussion in #23236, I think we'll maintain the two HashPartitioned(deprecated) and KeyPartitioned and treat them as equal.
Probably that can be done in a preliminary PR.
gabotechs
left a comment
There was a problem hiding this comment.
Approach looks good to me, besides a bit more test coverage and other suggestions, I think this is good.
| let should_add_hash_repartition = hash_necessary | ||
| && needs_hash_repartition | ||
| && !range_satisfied_for_aggregate; | ||
|
|
There was a problem hiding this comment.
The bool threading in this file is pretty mind-bending, although it seems like it's just how it is, I cannot think of a way of simplifying it...
This PR actually manages that complexity relatively well. I'll think a bit more about it and see if there's a way we can make it easier, but I don't think this is a blocker as long as we have good test coverage.
There was a problem hiding this comment.
I know... this rule was the first optimizer rule I spent time in and man, its a hard one to digest. It is on my bucket list to clean this guy up more too
| # TEST 1: Aggregate on Range Partition Column | ||
| # Scanning range_key preserves source Range partitioning metadata. | ||
| # Planning still inserts Hash repartitioning today; later optimizer PRs can | ||
| # use this baseline to show when the repartition is removed. | ||
| # Planning does not need Hash repartitioning because Range partitioning | ||
| # colocates rows with equal range_key values. | ||
| ########## | ||
|
|
There was a problem hiding this comment.
It seems like we can add a bit more coverage to this file?
For example, adding some positive and negative tests that play with subset satisfaction + range partitioning, and trying to get all the boolean code paths in enforce_distribution.rs to execute.
| .is_satisfied() | ||
| { | ||
| .is_satisfied(); | ||
| let range_satisfies_aggregate_distribution = |
There was a problem hiding this comment.
Following on our discussion in #23236, I think we'll maintain the two HashPartitioned(deprecated) and KeyPartitioned and treat them as equal.
Probably that can be done in a preliminary PR.
@gabotechs awesome, going to get that preliminary one in first and rebase it, thank you 👍 |
Which issue does this PR close?
Rationale for this change
Range partitioning can satisfy aggregate hash partitioning: equal group keys are already partitioned, even though the partitioning is not hash-based.
This is the first unary-operator implementation from the range partitioning discussion before making broader public API changes around
HashPartitioned/KeyPartitioned.What changes are included in this PR?
EnforceDistributionDistributionenum variants yet until more operators are supportedAre these changes tested?
Yes.
Are there any user-facing changes?
Yes. Range-partitioned aggregate plans can now avoid hash repartitioning.