diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs index 942432239612e..062ac7ca09758 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs @@ -1050,12 +1050,12 @@ fn join_after_agg_alias() -> Result<()> { @r" HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a1@0, a2@0)] AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[] - RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10 + RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10, max_aggr_partition_factor=4 AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[] RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet AggregateExec: mode=FinalPartitioned, gby=[a2@0 as a2], aggr=[] - RepartitionExec: partitioning=Hash([a2@0], 10), input_partitions=10 + RepartitionExec: partitioning=Hash([a2@0], 10), input_partitions=10, max_aggr_partition_factor=4 AggregateExec: mode=Partial, gby=[a@0 as a2], aggr=[] RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet diff --git a/datafusion/physical-optimizer/src/ensure_requirements/enforce_distribution.rs b/datafusion/physical-optimizer/src/ensure_requirements/enforce_distribution.rs index 76cb59a305a5f..96f95a3715770 100644 --- a/datafusion/physical-optimizer/src/ensure_requirements/enforce_distribution.rs +++ b/datafusion/physical-optimizer/src/ensure_requirements/enforce_distribution.rs @@ -756,9 +756,13 @@ fn add_hash_on_top( // - Usage of order preserving variants is not desirable (per the flag // `config.optimizer.prefer_existing_sort`). let partitioning = dist.create_partitioning(n_target); - let repartition = - RepartitionExec::try_new(Arc::clone(&input.plan), partitioning)? - .with_preserve_order(); + let max_aggr_partition_factor = hash_aggregate_partition_factor(&input.plan); + let repartition = RepartitionExec::try_new_with_max_aggr_partition_factor( + Arc::clone(&input.plan), + partitioning, + max_aggr_partition_factor, + )? + .with_preserve_order(); let plan = Arc::new(repartition) as _; return Ok(DistributionContext::new(plan, true, vec![input])); @@ -767,6 +771,16 @@ fn add_hash_on_top( Ok(input) } +fn hash_aggregate_partition_factor(input: &Arc) -> usize { + const DEFAULT_HASH_AGGREGATE_PARTITION_FACTOR: usize = 16; + + input + .downcast_ref::() + .filter(|aggregate| aggregate.mode() == &AggregateMode::Partial) + .map(|_| DEFAULT_HASH_AGGREGATE_PARTITION_FACTOR) + .unwrap_or(1) +} + /// Adds a [`SortPreservingMergeExec`] or a [`CoalescePartitionsExec`] operator /// on top of the given plan node to satisfy a single partition requirement /// while preserving ordering constraints. diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index 0e0b7e3b24892..0befe996ef4ca 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -138,6 +138,15 @@ harness = false name = "hash_join_semi_anti" required-features = ["test_utils"] +[[bench]] +harness = false +name = "hash_aggregate_output_coalescer" +required-features = ["test_utils"] + +[[bench]] +harness = false +name = "hash_group_by_reuse" + [[bench]] harness = false name = "multi_group_by" diff --git a/datafusion/physical-plan/benches/hash_aggregate_output_coalescer.rs b/datafusion/physical-plan/benches/hash_aggregate_output_coalescer.rs new file mode 100644 index 0000000000000..f6682d1276fec --- /dev/null +++ b/datafusion/physical-plan/benches/hash_aggregate_output_coalescer.rs @@ -0,0 +1,285 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Benchmarks for `HashAggregateOutputPartitionCoalescer` itself. +//! +//! This intentionally bypasses `RepartitionExec` / partitioning / async channels +//! and measures only: +//! +//! ```text +//! push_batch(relative_partition, RecordBatch) +//! -> optional RecordBatch::slice +//! -> flush_buffered +//! -> concat_batches +//! -> set_partition_runs_metadata +//! ``` + +use std::hint::black_box; +use std::sync::Arc; + +use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray}; +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use criterion::{BenchmarkId, Criterion, Throughput, criterion_group, criterion_main}; +use datafusion_physical_plan::repartition::HashAggregateOutputPartitionCoalescer; + +const TARGET_BATCH_SIZE: usize = 8129; +const NUM_ROWS: usize = 1_000_000; +const MANY_SMALL_BATCH_SIZE: usize = 1024; +const INTERLEAVED_BATCH_SIZE: usize = 128; +const PARTITIONED_BATCH_SIZE: usize = 1024; +const NUM_RELATIVE_PARTITIONS: usize = 8; + +#[derive(Clone, Copy)] +enum SchemaLayout { + IntOnly, + MixedString, +} + +impl SchemaLayout { + fn name(self) -> &'static str { + match self { + Self::IntOnly => "int_only", + Self::MixedString => "mixed_string", + } + } + + fn schema(self) -> SchemaRef { + let fields = match self { + Self::IntOnly => vec![ + Field::new("k", DataType::Int32, false), + Field::new("v0", DataType::Int32, false), + Field::new("v1", DataType::Int32, false), + ], + Self::MixedString => vec![ + Field::new("k", DataType::Int32, false), + Field::new("v0", DataType::Int32, false), + Field::new("v1", DataType::Utf8, false), + ], + }; + Arc::new(Schema::new(fields)) + } +} + +#[derive(Clone, Copy)] +enum InputShape { + ManySmallBatches, + InterleavedPartitions, + PartitionedRuns, +} + +impl InputShape { + fn name(self) -> &'static str { + match self { + Self::ManySmallBatches => "many_1024_batches", + Self::InterleavedPartitions => "interleaved_128_batches_8_partitions", + Self::PartitionedRuns => "partitioned_1024_batches_8_runs", + } + } +} + +struct BenchInput { + schema: SchemaRef, + batches: Vec<(usize, RecordBatch)>, + num_rows: usize, + shape: InputShape, +} + +impl BenchInput { + fn new(layout: SchemaLayout, shape: InputShape) -> Self { + let schema = layout.schema(); + let batches = make_input_batches(&schema, layout, shape); + let num_rows = batches.iter().map(|(_, batch)| batch.num_rows()).sum(); + Self { + schema, + batches, + num_rows, + shape, + } + } +} + +fn make_input_batches( + schema: &SchemaRef, + layout: SchemaLayout, + shape: InputShape, +) -> Vec<(usize, RecordBatch)> { + match shape { + InputShape::ManySmallBatches => { + make_round_robin_batches(schema, layout, NUM_ROWS, MANY_SMALL_BATCH_SIZE, 1) + } + InputShape::InterleavedPartitions => make_round_robin_batches( + schema, + layout, + NUM_ROWS, + INTERLEAVED_BATCH_SIZE, + NUM_RELATIVE_PARTITIONS, + ), + InputShape::PartitionedRuns => make_partitioned_run_batches( + schema, + layout, + NUM_ROWS, + PARTITIONED_BATCH_SIZE, + NUM_RELATIVE_PARTITIONS, + ), + } +} + +fn make_round_robin_batches( + schema: &SchemaRef, + layout: SchemaLayout, + num_rows: usize, + batch_size: usize, + num_relative_partitions: usize, +) -> Vec<(usize, RecordBatch)> { + (0..num_rows.div_ceil(batch_size)) + .map(|batch_idx| { + let start = batch_idx * batch_size; + let len = batch_size.min(num_rows - start); + let relative_partition = batch_idx % num_relative_partitions; + (relative_partition, make_batch(schema, layout, start, len)) + }) + .collect() +} + +fn make_partitioned_run_batches( + schema: &SchemaRef, + layout: SchemaLayout, + num_rows: usize, + batch_size: usize, + num_relative_partitions: usize, +) -> Vec<(usize, RecordBatch)> { + let run_size = batch_size / num_relative_partitions; + (0..num_rows.div_ceil(batch_size)) + .map(|batch_idx| { + let start = batch_idx * batch_size; + let len = batch_size.min(num_rows - start); + let batch = make_batch(schema, layout, start, len); + let mut runs = Vec::new(); + let mut remaining = len; + for relative_partition in 0..num_relative_partitions { + if remaining == 0 { + break; + } + let run_len = run_size.min(remaining); + runs.push(format!("{relative_partition}:{run_len}")); + remaining -= run_len; + } + let mut batch = batch; + batch.schema_metadata_mut().insert( + "datafusion.internal.hash_aggr_partition_runs".to_string(), + runs.join(","), + ); + (0, batch) + }) + .collect() +} + +fn make_batch( + schema: &SchemaRef, + layout: SchemaLayout, + batch_start: usize, + batch_size: usize, +) -> RecordBatch { + let keys = make_int32_column(batch_start, batch_size, 0); + let values = make_int32_column(batch_start, batch_size, 1); + let columns = match layout { + SchemaLayout::IntOnly => { + vec![keys, values, make_int32_column(batch_start, batch_size, 2)] + } + SchemaLayout::MixedString => { + vec![keys, values, make_string_column(batch_start, batch_size)] + } + }; + + RecordBatch::try_new(Arc::clone(schema), columns).unwrap() +} + +fn make_int32_column(batch_start: usize, batch_size: usize, col_idx: usize) -> ArrayRef { + Arc::new(Int32Array::from_iter_values((0..batch_size).map( + |row_idx| { + let value = batch_start + row_idx; + ((value.wrapping_mul(31 + col_idx) + col_idx) % 1_000_003) as i32 + }, + ))) +} + +fn make_string_column(batch_start: usize, batch_size: usize) -> ArrayRef { + Arc::new(StringArray::from_iter_values((0..batch_size).map( + |row_idx| { + let value = batch_start + row_idx; + format!("payload_{value:016}") + }, + ))) +} + +fn run_coalescer(input: &BenchInput) -> usize { + let mut coalescer = HashAggregateOutputPartitionCoalescer::new( + &input.schema, + TARGET_BATCH_SIZE, + NUM_RELATIVE_PARTITIONS, + ); + + let mut output_rows = 0; + for (relative_partition, batch) in &input.batches { + if matches!(input.shape, InputShape::PartitionedRuns) { + coalescer.push_partitioned_batch(batch.clone()).unwrap(); + } else { + coalescer + .push_batch(*relative_partition, batch.clone()) + .unwrap(); + } + while let Some(batch) = coalescer.next_completed_batch() { + output_rows += batch.num_rows(); + black_box(batch); + } + } + + coalescer.finish().unwrap(); + while let Some(batch) = coalescer.next_completed_batch() { + output_rows += batch.num_rows(); + black_box(batch); + } + + output_rows +} + +fn bench_hash_aggregate_output_coalescer(c: &mut Criterion) { + let mut group = c.benchmark_group("hash_aggregate_output_coalescer"); + group.throughput(Throughput::Elements(NUM_ROWS as u64)); + + for layout in [SchemaLayout::IntOnly, SchemaLayout::MixedString] { + for shape in [ + InputShape::ManySmallBatches, + InputShape::InterleavedPartitions, + InputShape::PartitionedRuns, + ] { + let input = BenchInput::new(layout, shape); + let id = BenchmarkId::new(layout.name(), shape.name()); + group.bench_function(id, |bencher| { + bencher.iter(|| { + let output_rows = run_coalescer(&input); + assert_eq!(output_rows, input.num_rows); + }); + }); + } + } + + group.finish(); +} + +criterion_group!(benches, bench_hash_aggregate_output_coalescer); +criterion_main!(benches); diff --git a/datafusion/physical-plan/benches/hash_group_by_reuse.rs b/datafusion/physical-plan/benches/hash_group_by_reuse.rs new file mode 100644 index 0000000000000..97f3d1df90e3b --- /dev/null +++ b/datafusion/physical-plan/benches/hash_group_by_reuse.rs @@ -0,0 +1,293 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Benchmarks for comparing one large hash GROUP BY map with a reused map over +//! aggregate partitions. +//! +//! The input is a plain `Vec`. The `single` benchmark interns the +//! batches directly. The `reuse_clear_shrink` benchmark first slices the same +//! batches into `Vec>`, where the outer vector is aggregate +//! partition, then interns one partition at a time and clears the group values +//! for reuse. + +use arrow::array::{ArrayRef, Int32Array, StringViewArray}; +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use arrow::record_batch::RecordBatch; +use criterion::{BenchmarkId, Criterion, Throughput, criterion_group, criterion_main}; +use datafusion_physical_plan::aggregates::group_values::GroupValues; +use datafusion_physical_plan::aggregates::group_values::multi_group_by::GroupValuesColumn; +use std::env; +use std::hint::black_box; +use std::sync::Arc; + +const DEFAULT_BATCH_SIZE: usize = 8129; +const DEFAULT_NUM_ROWS: usize = 10_000_000; +const DEFAULT_NUM_AGGR_PARTITIONS: usize = 16; +const BENCH_NUM_ROWS_ENV: &str = "DF_HASH_GROUP_BY_REUSE_ROWS"; +const BENCH_NUM_AGGR_PARTITIONS_ENV: &str = "DF_HASH_GROUP_BY_REUSE_PARTITIONS"; + +#[derive(Clone, Copy)] +enum SchemaLayout { + IntOnly, + MixedStringView, +} + +impl SchemaLayout { + fn schema(self) -> SchemaRef { + let fields = match self { + Self::IntOnly => vec![ + Field::new("col_0", DataType::Int32, false), + Field::new("col_1", DataType::Int32, false), + Field::new("col_2", DataType::Int32, false), + ], + Self::MixedStringView => vec![ + Field::new("col_0", DataType::Int32, false), + Field::new("col_1", DataType::Utf8View, false), + Field::new("col_2", DataType::Int32, false), + ], + }; + Arc::new(Schema::new(fields)) + } + + fn num_cols(self) -> usize { + 3 + } +} + +struct GroupByInput { + schema: SchemaRef, + batches: Vec, + num_rows: usize, +} + +impl GroupByInput { + fn new(layout: SchemaLayout, num_rows: usize, num_groups: usize) -> Self { + let schema = layout.schema(); + let batches = make_batches(&schema, layout, num_rows, num_groups); + Self { + schema, + batches, + num_rows, + } + } +} + +fn make_batches( + schema: &SchemaRef, + layout: SchemaLayout, + num_rows: usize, + num_groups: usize, +) -> Vec { + let num_batches = num_rows.div_ceil(DEFAULT_BATCH_SIZE); + (0..num_batches) + .map(|batch_idx| { + let batch_start = batch_idx * DEFAULT_BATCH_SIZE; + let batch_size = DEFAULT_BATCH_SIZE.min(num_rows - batch_start); + let columns = make_columns(layout, batch_start, batch_size, num_groups); + RecordBatch::try_new(Arc::clone(schema), columns).unwrap() + }) + .collect() +} + +fn make_columns( + layout: SchemaLayout, + batch_start: usize, + batch_size: usize, + num_groups: usize, +) -> Vec { + (0..layout.num_cols()) + .map(|col_idx| make_column(layout, batch_start, batch_size, col_idx, num_groups)) + .collect() +} + +fn make_column( + layout: SchemaLayout, + batch_start: usize, + batch_size: usize, + col_idx: usize, + num_groups: usize, +) -> ArrayRef { + match (layout, col_idx) { + (SchemaLayout::MixedStringView, 1) => { + make_string_view_column(batch_start, batch_size, col_idx, num_groups) + } + _ => make_int32_column(batch_start, batch_size, col_idx, num_groups), + } +} + +fn make_int32_column( + batch_start: usize, + batch_size: usize, + col_idx: usize, + num_groups: usize, +) -> ArrayRef { + let values: Vec = (0..batch_size) + .map(|row_idx| group_value(batch_start + row_idx, col_idx, num_groups) as i32) + .collect(); + Arc::new(Int32Array::from(values)) +} + +fn make_string_view_column( + batch_start: usize, + batch_size: usize, + col_idx: usize, + num_groups: usize, +) -> ArrayRef { + let values: Vec = (0..batch_size) + .map(|row_idx| { + let value = group_value(batch_start + row_idx, col_idx, num_groups); + format!("group_key_{value:016}") + }) + .collect(); + Arc::new(StringViewArray::from(values)) +} + +fn group_value(row_idx: usize, col_idx: usize, num_groups: usize) -> usize { + let group_idx = row_idx % num_groups; + let per_col_card = (num_groups as f64).powf(1.0 / 3.0).ceil() as usize; + let divisor = per_col_card.pow(col_idx as u32); + (group_idx / divisor) % per_col_card +} + +fn split_by_aggr_partition( + batches: &[RecordBatch], + _num_rows: usize, + num_aggr_partitions: usize, +) -> Vec> { + let mut partitions = vec![Vec::new(); num_aggr_partitions]; + + for batch in batches { + // Step 1: Split each input batch into N contiguous slices, where N is + // the configured number of aggregate partitions. This models the + // coalesced final aggregate input layout directly and keeps the + // slicing rule local to each batch. + let rows_per_slice = batch.num_rows().div_ceil(num_aggr_partitions); + + // Step 2: Append slice K of this batch to aggregate partition K. + // Empty trailing slices are skipped when the batch has fewer rows than + // the configured number of aggregate partitions. + for (partition_idx, partition_batches) in partitions.iter_mut().enumerate() { + let offset = partition_idx * rows_per_slice; + if offset >= batch.num_rows() { + break; + } + + let slice_len = rows_per_slice.min(batch.num_rows() - offset); + partition_batches.push(batch.slice(offset, slice_len)); + } + } + + partitions +} + +fn intern_batch( + group_values: &mut GroupValuesColumn, + batch: &RecordBatch, + groups: &mut Vec, +) { + groups.clear(); + group_values.intern(batch.columns(), groups).unwrap(); + black_box(groups.len()); +} + +fn bench_single(input: &GroupByInput) { + let mut group_values = + GroupValuesColumn::::try_new(Arc::clone(&input.schema)).unwrap(); + let mut groups = Vec::with_capacity(DEFAULT_BATCH_SIZE); + + for batch in &input.batches { + intern_batch(&mut group_values, batch, &mut groups); + } + + black_box(group_values.len()); +} + +fn bench_reuse_clear_shrink(input: &GroupByInput, num_aggr_partitions: usize) { + let partitions = + split_by_aggr_partition(&input.batches, input.num_rows, num_aggr_partitions); + let rows_per_partition = input.num_rows.div_ceil(num_aggr_partitions); + let mut group_values = + GroupValuesColumn::::try_new(Arc::clone(&input.schema)).unwrap(); + let mut groups = Vec::with_capacity(DEFAULT_BATCH_SIZE); + + for partition_batches in partitions { + for batch in partition_batches { + intern_batch(&mut group_values, &batch, &mut groups); + } + black_box(group_values.len()); + group_values.clear_shrink(rows_per_partition); + } +} + +fn num_rows_from_env() -> usize { + env::var(BENCH_NUM_ROWS_ENV) + .ok() + .and_then(|value| value.parse().ok()) + .unwrap_or(DEFAULT_NUM_ROWS) +} + +fn num_aggr_partitions_from_env() -> usize { + env::var(BENCH_NUM_AGGR_PARTITIONS_ENV) + .ok() + .and_then(|value| value.parse().ok()) + .unwrap_or(DEFAULT_NUM_AGGR_PARTITIONS) +} + +fn bench_hash_group_by_reuse(c: &mut Criterion) { + let mut group = c.benchmark_group("hash_group_by_reuse"); + group.sample_size(10); + + let num_rows = num_rows_from_env(); + let num_aggr_partitions = num_aggr_partitions_from_env(); + let cases = [ + ("int_low_cardinality", SchemaLayout::IntOnly, 1024), + ("int_high_cardinality", SchemaLayout::IntOnly, num_rows / 2), + ( + "mixed_string_view_low_cardinality", + SchemaLayout::MixedStringView, + 1024, + ), + ( + "mixed_string_view_high_cardinality", + SchemaLayout::MixedStringView, + num_rows / 2, + ), + ]; + + for (case_name, layout, num_groups) in cases { + let input = GroupByInput::new(layout, num_rows, num_groups); + group.throughput(Throughput::Elements(input.num_rows as u64)); + + group.bench_with_input( + BenchmarkId::new("single", case_name), + &input, + |bencher, input| bencher.iter(|| bench_single(input)), + ); + group.bench_with_input( + BenchmarkId::new("reuse_clear_shrink", case_name), + &input, + |bencher, input| { + bencher.iter(|| bench_reuse_clear_shrink(input, num_aggr_partitions)); + }, + ); + } + + group.finish(); +} + +criterion_group!(benches, bench_hash_group_by_reuse); +criterion_main!(benches); diff --git a/datafusion/physical-plan/src/aggregates/aggregate_hash_table/final_table.rs b/datafusion/physical-plan/src/aggregates/aggregate_hash_table/final_table.rs index c3e4f831c4bbf..f1b78ccdd820e 100644 --- a/datafusion/physical-plan/src/aggregates/aggregate_hash_table/final_table.rs +++ b/datafusion/physical-plan/src/aggregates/aggregate_hash_table/final_table.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::marker::PhantomData; use std::sync::Arc; use arrow::datatypes::SchemaRef; @@ -23,6 +24,8 @@ use datafusion_common::{Result, internal_err}; use datafusion_expr::EmitTo; use crate::aggregates::AggregateExec; +use crate::aggregates::group_values::new_group_values; +use crate::aggregates::order::GroupOrdering; use super::common::{ AggregateHashTable, AggregateHashTableBuffer, AggregateHashTableState, FinalMarker, @@ -46,6 +49,36 @@ impl AggregateHashTable { ) } + pub(in crate::aggregates) fn empty_like(&self) -> Result { + let AggregateHashTableState::Building(state) = &self.state else { + return internal_err!( + "cannot create empty final hash aggregate table from non-building state" + ); + }; + + let group_schema = state.group_by.group_schema(&self.input_schema)?; + let group_values = new_group_values(group_schema, &GroupOrdering::None)?; + let accumulators = state + .accumulators + .iter() + .map(|acc| acc.empty_like()) + .collect::>>()?; + + Ok(AggregateHashTable { + group_by_metrics: self.group_by_metrics.clone(), + input_schema: Arc::clone(&self.input_schema), + output_schema: Arc::clone(&self.output_schema), + batch_size: self.batch_size, + state: AggregateHashTableState::Building(AggregateHashTableBuffer { + group_by: Arc::clone(&state.group_by), + group_values, + batch_group_indices: Default::default(), + accumulators, + }), + _mode: PhantomData, + }) + } + /// Emits the next batch of aggregated group keys and final aggregate values. /// /// The output batch size is determined by `self.batch_size`. diff --git a/datafusion/physical-plan/src/aggregates/hash_aggregate.rs b/datafusion/physical-plan/src/aggregates/hash_aggregate.rs index 4c8756c0e865c..97697e67298d1 100644 --- a/datafusion/physical-plan/src/aggregates/hash_aggregate.rs +++ b/datafusion/physical-plan/src/aggregates/hash_aggregate.rs @@ -25,28 +25,114 @@ //! //! See issue for details: +use std::collections::BTreeMap; use std::ops::ControlFlow; use std::sync::Arc; use std::task::{Context, Poll}; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; -use datafusion_common::Result; +use datafusion_common::utils::memory::get_record_batch_memory_size; +use datafusion_common::{DataFusionError, Result, internal_err}; use datafusion_execution::TaskContext; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use futures::stream::{Stream, StreamExt}; -use super::AggregateExec; use super::aggregate_hash_table::{ AggregateHashTable, FinalMarker, PartialMarker, PartialSkipMarker, }; use super::skip_partial::SkipAggregationProbe; +use super::{AggregateExec, partition_runs}; use crate::metrics::{ BaselineMetrics, MetricBuilder, MetricCategory, RecordOutput, SpillMetrics, }; use crate::stream::EmptyRecordBatchStream; use crate::{InputOrderMode, RecordBatchStream, SendableRecordBatchStream, metrics}; +struct BucketStream { + schema: SchemaRef, + iter: std::vec::IntoIter, +} + +impl Stream for BucketStream { + type Item = Result; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(self.get_mut().iter.next().map(Ok)) + } +} + +impl RecordBatchStream for BucketStream { + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } +} + +struct FinalPartitionRunState { + runs: BTreeMap>, + run_sizes: BTreeMap, + total_runs_size: usize, + replaying_run_size: usize, + is_draining: bool, +} + +impl FinalPartitionRunState { + fn new() -> Self { + Self { + runs: BTreeMap::new(), + run_sizes: BTreeMap::new(), + total_runs_size: 0, + replaying_run_size: 0, + is_draining: false, + } + } + + fn total_size(&self) -> usize { + self.total_runs_size + self.replaying_run_size + } + + fn stage_batch(&mut self, batch: RecordBatch, partition_id: usize) -> usize { + let batch_size = get_record_batch_memory_size(&batch); + self.runs.entry(partition_id).or_default().push(batch); + *self.run_sizes.entry(partition_id).or_default() += batch_size; + self.total_runs_size += batch_size; + batch_size + } + + fn begin_replay(&mut self) { + self.is_draining = true; + } + + fn finish_replaying_run(&mut self) { + self.replaying_run_size = 0; + } + + fn next_partition_id(&self) -> Option { + self.runs + .first_key_value() + .map(|(partition_id, _)| *partition_id) + } + + fn take_run(&mut self, partition_id: usize) -> Result> { + let runs = self.runs.remove(&partition_id).ok_or_else(|| { + DataFusionError::Internal(format!( + "Missing buffered runs for final aggregate partition {partition_id}" + )) + })?; + let run_size = self.run_sizes.remove(&partition_id).ok_or_else(|| { + DataFusionError::Internal(format!( + "Missing buffered run size for final aggregate partition {partition_id}" + )) + })?; + self.total_runs_size -= run_size; + self.replaying_run_size = run_size; + Ok(runs) + } +} + /// Hash aggregation is implemented in two stages: partial and final. This /// stream implements the partial stage. /// @@ -201,9 +287,15 @@ pub(crate) struct FinalHashAggregateStream { /// Memory reservation for group keys and accumulators. reservation: MemoryReservation, + /// Buffered final-partitioned runs replayed one aggregate partition at a time. + partition_run_state: Option, + /// See comments for the same variable in [`PartialHashAggregateStream`]. group_values_soft_limit: Option, + /// Template used to create a fresh hash table for each replayed partition run. + hash_table_template: Option>, + /// Tracks the high-level stream lifecycle. The hash table owns the lower-level /// state for emitting output batches. state: Option, @@ -261,10 +353,6 @@ impl FinalHashAggregateState { hash_table: self.into_hash_table(), } } - - fn into_done(self) -> Self { - Self::Done - } } impl PartialHashAggregateStream { @@ -772,23 +860,203 @@ impl FinalHashAggregateStream { let reservation = MemoryConsumer::new(format!("FinalHashAggregateStream[{partition}]")) .register(context.memory_pool()); + let uses_partition_runs = agg.mode == super::AggregateMode::FinalPartitioned; + let hash_table_template = uses_partition_runs + .then(|| hash_table.empty_like()) + .transpose()?; Ok(Self { schema, input, baseline_metrics, reservation, + partition_run_state: uses_partition_runs.then(FinalPartitionRunState::new), group_values_soft_limit: agg.limit_options().map(|config| config.limit()), + hash_table_template, state: Some(FinalHashAggregateState::ReadingInput { hash_table }), }) } /// See comments in [`Self::group_values_soft_limit`] for details. fn hit_soft_group_limit(&self, hash_table: &AggregateHashTable) -> bool { + if self + .partition_run_state + .as_ref() + .is_some_and(|state| state.is_draining || !state.runs.is_empty()) + { + return false; + } + self.group_values_soft_limit .is_some_and(|limit| limit <= hash_table.building_group_count()) } + fn should_buffer_partition_runs(&self) -> bool { + self.partition_run_state + .as_ref() + .is_some_and(|state| !state.is_draining) + } + + fn stage_partition_runs(&mut self, batch: &RecordBatch) -> Result> { + let Some(runs) = partition_runs(batch.schema_ref())? else { + if self + .partition_run_state + .as_ref() + .is_some_and(|state| !state.runs.is_empty()) + { + return internal_err!( + "missing partition run metadata for final partitioned aggregation after buffered runs have started" + ); + } + return Ok(None); + }; + + let mut offset = 0; + let mut staged_memory = 0; + for run in runs { + let run_batch = batch.slice(offset, run.len); + offset += run.len; + staged_memory += self + .partition_run_state + .as_mut() + .map(|state| state.stage_batch(run_batch, run.relative_partition)) + .unwrap_or(0); + } + Ok(Some(staged_memory)) + } + + fn reserve_staged_partition_runs( + &mut self, + batch_memory: usize, + hash_table: &AggregateHashTable, + ) -> Result<()> { + if batch_memory == 0 { + return Ok(()); + } + + let total_buffered_size = self + .partition_run_state + .as_ref() + .map(FinalPartitionRunState::total_size) + .unwrap_or(0); + + if total_buffered_size == batch_memory { + return self.update_memory_reservation(Some(hash_table)); + } + + self.reservation.try_grow(batch_memory)?; + Ok(()) + } + + fn begin_partition_run_replay( + &mut self, + hash_table: AggregateHashTable, + ) -> Result { + if let Some(state) = self.partition_run_state.as_mut() { + state.begin_replay(); + } + + self.load_next_partition_run(hash_table) + } + + fn finish_partition_run_replay( + &mut self, + mut hash_table: AggregateHashTable, + ) -> Result { + self.start_output(&mut hash_table)?; + + if let Some(state) = self.partition_run_state.as_mut() { + state.finish_replaying_run(); + } + + self.update_memory_reservation(Some(&hash_table))?; + Ok(FinalHashAggregateState::ProducingOutput { hash_table }) + } + + fn fresh_partition_run_hash_table(&self) -> Result> { + self.hash_table_template + .as_ref() + .ok_or_else(|| { + DataFusionError::Internal( + "missing final aggregate hash table template".to_string(), + ) + }) + .and_then(AggregateHashTable::empty_like) + } + + fn next_state_after_partition_output(&mut self) -> Result { + if !self + .partition_run_state + .as_ref() + .is_some_and(|state| state.is_draining) + { + return Ok(FinalHashAggregateState::Done); + } + + let hash_table = self.fresh_partition_run_hash_table()?; + self.load_next_partition_run(hash_table) + } + + fn load_next_partition_run( + &mut self, + hash_table: AggregateHashTable, + ) -> Result { + let next_partition_id = self + .partition_run_state + .as_ref() + .and_then(FinalPartitionRunState::next_partition_id); + + if let Some(partition_id) = next_partition_id { + let runs = self + .partition_run_state + .as_mut() + .ok_or_else(|| { + DataFusionError::Internal( + "missing partition run state while replaying final aggregate runs" + .to_string(), + ) + })? + .take_run(partition_id)?; + let schema = runs.first().map(|batch| batch.schema()).ok_or_else(|| { + DataFusionError::Internal(format!( + "Buffered runs for final aggregate partition {partition_id} were unexpectedly empty" + )) + })?; + self.input = Box::pin(BucketStream { + schema, + iter: runs.into_iter(), + }); + self.update_memory_reservation(Some(&hash_table))?; + Ok(FinalHashAggregateState::ReadingInput { hash_table }) + } else { + let input_schema = self.input.schema(); + self.input = Box::pin(EmptyRecordBatchStream::new(input_schema)); + self.update_memory_reservation(None)?; + Ok(FinalHashAggregateState::Done) + } + } + + fn update_memory_reservation( + &mut self, + hash_table: Option<&AggregateHashTable>, + ) -> Result<()> { + let hash_table_size = hash_table + .map(AggregateHashTable::memory_size) + .or_else(|| { + self.state + .as_ref() + .map(|state| state.hash_table().memory_size()) + }) + .unwrap_or(0); + let partition_runs_size = self + .partition_run_state + .as_ref() + .map(FinalPartitionRunState::total_size) + .unwrap_or(0); + self.reservation + .try_resize(hash_table_size + partition_runs_size) + } + fn start_output( &mut self, hash_table: &mut AggregateHashTable, @@ -798,6 +1066,45 @@ impl FinalHashAggregateStream { hash_table.start_output() } + // FinalPartitioned inputs may arrive as coalesced batches with partition run + // metadata. The rows are already grouped by relative aggregate partition inside + // each batch, but multiple relative partitions can share the same output stream. + // + // Original input stream: + // + // batch A rows: [ p0 ][ p1 ] metadata: [(p0, len), (p1, len)] + // batch B rows: [ p0 ][ p1 ] metadata: [(p0, len), (p1, len)] + // + // Buffer by relative aggregate partition while reading the original stream: + // + // FinalPartitionRunState.runs + // +----+-----------------------------+ + // | p0 | [ A.p0 slice, B.p0 slice ] | + // | p1 | [ A.p1 slice, B.p1 slice ] | + // +----+-----------------------------+ + // + // Replay after the original stream ends: + // + // p0 slices -> fresh final hash table -> output p0 groups + // p1 slices -> fresh final hash table -> output p1 groups + // + // This keeps equal group keys from different relative aggregate partitions from + // being merged together, while still merging the same group key within one + // relative aggregate partition. + // + // State transitions: + // + // Reading original input + // + metadata -> buffer slices, keep ReadingInput + // + no metadata -> normal aggregate_batch path + // + input done -> begin_partition_run_replay(...) + // + // Reading BucketStream(pN) + // + input done -> finish_partition_run_replay(...) -> ProducingOutput(pN) + // + // ProducingOutput(pN) + // + output done -> load_next_partition_run(fresh table) or Done + // /// Handle ReadingInput state - aggregate partial state batches into the hash table. /// /// See comments at `poll_next()` for details. @@ -819,6 +1126,44 @@ impl FinalHashAggregateStream { Poll::Ready(Some(Ok(batch))) => { let elapsed_compute = self.baseline_metrics.elapsed_compute().clone(); let timer = elapsed_compute.timer(); + + if self.should_buffer_partition_runs() { + if original_state.hash_table().building_group_count() != 0 { + timer.done(); + return ControlFlow::Break(( + Poll::Ready(Some(internal_err!( + "cannot switch final partitioned aggregation to buffered runs after groups have already been accumulated" + ))), + original_state, + )); + } + + match self.stage_partition_runs(&batch) { + Ok(Some(batch_memory)) => { + let result = self.reserve_staged_partition_runs( + batch_memory, + original_state.hash_table(), + ); + timer.done(); + if let Err(e) = result { + return ControlFlow::Break(( + Poll::Ready(Some(Err(e))), + original_state, + )); + } + return ControlFlow::Continue(original_state); + } + Ok(None) => {} + Err(e) => { + timer.done(); + return ControlFlow::Break(( + Poll::Ready(Some(Err(e))), + original_state, + )); + } + } + } + let result = original_state.hash_table_mut().aggregate_batch(&batch); timer.done(); @@ -844,9 +1189,8 @@ impl FinalHashAggregateStream { return ControlFlow::Continue(original_state.into_producing_output()); } - if let Err(e) = self - .reservation - .try_resize(original_state.hash_table().memory_size()) + if let Err(e) = + self.update_memory_reservation(Some(original_state.hash_table())) { return ControlFlow::Break(( Poll::Ready(Some(Err(e))), @@ -862,16 +1206,25 @@ impl FinalHashAggregateStream { Poll::Ready(None) => { let elapsed_compute = self.baseline_metrics.elapsed_compute().clone(); let timer = elapsed_compute.timer(); - let result = self.start_output(original_state.hash_table_mut()); + let result = match self.partition_run_state.as_ref() { + Some(state) if state.is_draining => { + self.finish_partition_run_replay(original_state.into_hash_table()) + } + Some(state) if !state.runs.is_empty() => { + self.begin_partition_run_replay(original_state.into_hash_table()) + } + _ => self + .start_output(original_state.hash_table_mut()) + .map(|()| original_state.into_producing_output()), + }; timer.done(); match result { - Ok(()) => { - ControlFlow::Continue(original_state.into_producing_output()) - } - Err(e) => { - ControlFlow::Break((Poll::Ready(Some(Err(e))), original_state)) - } + Ok(next_state) => ControlFlow::Continue(next_state), + Err(e) => ControlFlow::Break(( + Poll::Ready(Some(Err(e))), + FinalHashAggregateState::Done, + )), } } } @@ -899,13 +1252,20 @@ impl FinalHashAggregateStream { match result { Ok(Some(batch)) => { - let _ = self - .reservation - .try_resize(original_state.hash_table().memory_size()); debug_assert!(batch.num_rows() > 0); let next_state = if original_state.hash_table().is_done() { - original_state.into_done() + match self.next_state_after_partition_output() { + Ok(next_state) => next_state, + Err(e) => { + return ControlFlow::Break(( + Poll::Ready(Some(Err(e))), + FinalHashAggregateState::Done, + )); + } + } } else { + let _ = + self.update_memory_reservation(Some(original_state.hash_table())); original_state }; @@ -916,7 +1276,7 @@ impl FinalHashAggregateStream { } Ok(None) => { let _ = self.reservation.try_resize(0); - ControlFlow::Continue(original_state.into_done()) + ControlFlow::Continue(FinalHashAggregateState::Done) } Err(e) => ControlFlow::Break((Poll::Ready(Some(Err(e))), original_state)), } @@ -1007,19 +1367,109 @@ mod tests { use std::sync::Arc; use super::*; - use crate::aggregates::{AggregateMode, PhysicalGroupBy}; + use crate::aggregates::{ + AggregateMode, PartitionRun, PhysicalGroupBy, set_partition_runs_metadata, + }; use crate::execution_plan::ExecutionPlan; use crate::test::TestMemoryExec; - use arrow::array::{Int32Array, Int64Array}; + use arrow::array::{ArrayRef, Int32Array, Int64Array}; use arrow::datatypes::{DataType, Field, Schema}; use datafusion_common::Result; use datafusion_execution::runtime_env::RuntimeEnvBuilder; use datafusion_functions_aggregate::count::count_udaf; + use datafusion_functions_aggregate::sum::sum_udaf; use datafusion_physical_expr::aggregate::AggregateExprBuilder; use datafusion_physical_expr::expressions::col; use futures::StreamExt; + // Covers final partitioned hash aggregation replaying coalesced partition + // runs one relative aggregate partition at a time. + // Example: interleaved runs for partitions 0 and 1 are aggregated separately. + #[tokio::test] + async fn test_final_hash_stream_replays_partition_runs() -> Result<()> { + let schema = Arc::new(Schema::new(vec![ + Field::new("group_col", DataType::Int32, false), + Field::new("value", DataType::Int64, false), + ])); + + let make_batch = |groups: Vec, values: Vec| { + RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from(groups)) as ArrayRef, + Arc::new(Int64Array::from(values)) as ArrayRef, + ], + ) + }; + + let input_batches = vec![ + set_partition_runs_metadata( + make_batch(vec![1, 2, 1], vec![2, 5, 11])?, + &[PartitionRun::new(0, 2)?, PartitionRun::new(1, 1)?], + )?, + set_partition_runs_metadata( + make_batch(vec![1, 3, 1, 3], vec![3, 7, 1, 9])?, + &[PartitionRun::new(0, 2)?, PartitionRun::new(1, 2)?], + )?, + ]; + + let input = + TestMemoryExec::try_new_exec(&[input_batches], Arc::clone(&schema), None)?; + let input = + Arc::new(TestMemoryExec::update_cache(&input)) as Arc; + + let group_by = PhysicalGroupBy::new_single(vec![( + col("group_col", &schema)?, + "group_col".to_string(), + )]); + let aggr_expr = vec![Arc::new( + AggregateExprBuilder::new(sum_udaf(), vec![col("value", &schema)?]) + .schema(Arc::clone(&schema)) + .alias("SUM(value)") + .build()?, + )]; + let aggregate_exec = AggregateExec::try_new( + AggregateMode::FinalPartitioned, + group_by, + aggr_expr, + vec![None], + input, + Arc::clone(&schema), + )?; + + let task_ctx = Arc::new(TaskContext::default()); + let mut stream = FinalHashAggregateStream::new(&aggregate_exec, &task_ctx, 0)?; + let mut actual = Vec::new(); + while let Some(batch) = stream.next().await { + let batch = batch?; + let groups = batch + .column(0) + .as_any() + .downcast_ref::() + .expect("group column should be Int32"); + let sums = batch + .column(1) + .as_any() + .downcast_ref::() + .expect("sum column should be Int64"); + for row in 0..batch.num_rows() { + actual.push((groups.value(row), sums.value(row))); + } + } + actual.sort_unstable(); + + assert_eq!(actual, vec![(1, 5), (1, 12), (2, 5), (3, 7), (3, 9)]); + assert!( + stream + .partition_run_state + .as_ref() + .is_some_and(|state| state.runs.is_empty() && state.is_draining) + ); + + Ok(()) + } + #[tokio::test] async fn test_partial_hash_stream_double_emission_race_condition_bug() -> Result<()> { // Fix for https://github.com/apache/datafusion/issues/18701 diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 4f5b893578d74..5e43ca69b72cc 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -69,6 +69,9 @@ use datafusion_physical_expr_common::sort_expr::{ use datafusion_expr::utils::AggregateOrderSensitivity; use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays; use itertools::Itertools; +pub(crate) use partition_runs::{ + PartitionRun, partition_runs, set_partition_runs_metadata, +}; use topk::hash_table::is_supported_hash_key_type; use topk::heap::is_supported_heap_type; @@ -77,6 +80,7 @@ pub mod group_values; mod hash_aggregate; mod no_grouping; pub mod order; +mod partition_runs; mod row_hash; mod skip_partial; mod topk; diff --git a/datafusion/physical-plan/src/aggregates/partition_runs.rs b/datafusion/physical-plan/src/aggregates/partition_runs.rs new file mode 100644 index 0000000000000..d21d7d9b3626a --- /dev/null +++ b/datafusion/physical-plan/src/aggregates/partition_runs.rs @@ -0,0 +1,200 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Metadata helpers for hash aggregate repartition runs. +//! +//! This module does NOT perform repartitioning or aggregation. It only owns the +//! schema metadata contract used to describe contiguous relative aggregate +//! partition runs inside a coalesced record batch. +//! +//! Entry points: [`set_partition_runs_metadata`] and [`partition_runs`]. + +use arrow::datatypes::Schema; +use arrow::record_batch::RecordBatch; +use datafusion_common::{Result, internal_err}; + +const PARTITION_RUN_SEPARATOR: char = ','; +const PARTITION_RUN_FIELD_SEPARATOR: char = ':'; + +pub(crate) const AGGR_PARTITION_RUNS_METADATA_KEY: &str = + "datafusion.internal.hash_aggr_partition_runs"; + +/// A contiguous run of rows for one relative aggregate partition. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) struct PartitionRun { + /// Relative aggregate partition inside one output partition. + pub relative_partition: usize, + /// Number of contiguous rows in this run. + pub len: usize, +} + +impl PartitionRun { + /// Create a non-empty partition run. + pub(crate) fn new(relative_partition: usize, len: usize) -> Result { + if len == 0 { + return internal_err!("Hash aggregate partition run length must be nonzero"); + } + + Ok(Self { + relative_partition, + len, + }) + } +} + +/// Return partition runs encoded on `schema`, if present. +pub(crate) fn partition_runs(schema: &Schema) -> Result>> { + schema + .metadata() + .get(AGGR_PARTITION_RUNS_METADATA_KEY) + .map(|value| decode_partition_runs(value)) + .transpose() +} + +/// Encode partition runs as schema metadata value. +pub(crate) fn encode_partition_runs(runs: &[PartitionRun]) -> String { + runs.iter() + .map(|run| format!("{}:{}", run.relative_partition, run.len)) + .collect::>() + .join(",") +} + +/// Decode partition runs from schema metadata value. +pub(crate) fn decode_partition_runs(value: &str) -> Result> { + if value.is_empty() { + return Ok(vec![]); + } + + value + .split(PARTITION_RUN_SEPARATOR) + .map(decode_partition_run) + .collect() +} + +/// Set partition runs metadata on `batch`. +pub(crate) fn set_partition_runs_metadata( + mut batch: RecordBatch, + runs: &[PartitionRun], +) -> Result { + validate_partition_runs(runs, batch.num_rows())?; + let value = encode_partition_runs(runs); + debug_assert_eq!(decode_partition_runs(&value)?, runs); + batch + .schema_metadata_mut() + .insert(AGGR_PARTITION_RUNS_METADATA_KEY.to_string(), value); + Ok(batch) +} + +fn decode_partition_run(value: &str) -> Result { + let Some((relative_partition, len)) = value.split_once(PARTITION_RUN_FIELD_SEPARATOR) + else { + return internal_err!( + "Invalid hash aggregate partition run metadata entry '{value}'" + ); + }; + + let relative_partition = relative_partition.parse::().map_err(|err| { + datafusion_common::DataFusionError::Internal(format!( + "Invalid hash aggregate partition run partition '{relative_partition}': {err}" + )) + })?; + let len = len.parse::().map_err(|err| { + datafusion_common::DataFusionError::Internal(format!( + "Invalid hash aggregate partition run length '{len}': {err}" + )) + })?; + + PartitionRun::new(relative_partition, len) +} + +fn validate_partition_runs(runs: &[PartitionRun], num_rows: usize) -> Result<()> { + if num_rows == 0 { + if runs.is_empty() { + return Ok(()); + } + return internal_err!( + "Hash aggregate partition runs metadata must be empty for empty batches" + ); + } + + if runs.is_empty() { + return internal_err!( + "Hash aggregate partition runs metadata must not be empty for non-empty batches" + ); + } + + let total_rows: usize = runs.iter().map(|run| run.len).sum(); + if total_rows != num_rows { + return internal_err!( + "Hash aggregate partition runs contain {total_rows} rows, expected {num_rows}" + ); + } + + if runs.iter().any(|run| run.len == 0) { + return internal_err!("Hash aggregate partition run length must be nonzero"); + } + + Ok(()) +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use arrow::array::UInt32Array; + use arrow::datatypes::{DataType, Field, Schema}; + + use super::*; + + // Covers encoding and decoding multiple relative aggregate partition runs. + // Example: runs [(0, 3), (2, 4)] are encoded as "0:3,2:4". + #[test] + fn test_partition_runs_encode_decode_multiple_runs() -> Result<()> { + let runs = vec![PartitionRun::new(0, 3)?, PartitionRun::new(2, 4)?]; + + let encoded = encode_partition_runs(&runs); + assert_eq!(encoded, "0:3,2:4"); + assert_eq!(decode_partition_runs(&encoded)?, runs); + + Ok(()) + } + + // Covers attaching partition runs to record batch schema metadata. + // Example: a 3-row batch with run [(1, 3)] stores metadata "1:3". + #[test] + fn test_set_partition_runs_metadata_sets_batch_schema_metadata() -> Result<()> { + let schema = + Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)])); + let batch = RecordBatch::try_new( + schema, + vec![Arc::new(UInt32Array::from(vec![1, 2, 3]))], + )?; + let runs = vec![PartitionRun::new(1, 3)?]; + + let batch = set_partition_runs_metadata(batch, &runs)?; + + assert_eq!(decode_partition_runs(&encode_partition_runs(&runs))?, runs); + assert_eq!( + batch + .schema_ref() + .metadata() + .get(AGGR_PARTITION_RUNS_METADATA_KEY), + Some(&"1:3".to_string()) + ); + Ok(()) + } +} diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 2298183485f55..0ae996cd1b3e5 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -19,6 +19,7 @@ //! partitions to M output partitions based on a partitioning scheme, optionally //! maintaining the order of the input rows in the output. +use std::collections::VecDeque; use std::fmt::{Debug, Formatter}; use std::pin::Pin; use std::sync::Arc; @@ -31,6 +32,7 @@ use super::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}; use super::{ DisplayAs, ExecutionPlanProperties, RecordBatchStream, SendableRecordBatchStream, }; +use crate::aggregates::{PartitionRun, partition_runs, set_partition_runs_metadata}; use crate::coalesce::LimitedBatchCoalescer; use crate::execution_plan::{CardinalityEffect, EvaluationType, SchedulingType}; use crate::hash_utils::create_hashes; @@ -46,9 +48,18 @@ use crate::{ check_if_same_properties, }; -use arrow::array::{PrimitiveArray, RecordBatch, RecordBatchOptions}; -use arrow::compute::take_arrays; -use arrow::datatypes::{SchemaRef, UInt32Type}; +use arrow::array::{ + Array, ArrayRef, AsArray, PrimitiveArray, RecordBatch, RecordBatchOptions, make_array, +}; +use arrow::compute::{concat_batches, take_arrays}; +use arrow::datatypes::{ + ArrowNativeType, BinaryType, ByteArrayType, DataType, LargeBinaryType, LargeUtf8Type, + SchemaRef, UInt32Type, Utf8Type, +}; +use arrow_data::{ + ArrayData, + transform::{Capacities, MutableArrayData}, +}; use datafusion_common::config::ConfigOptions; use datafusion_common::stats::Precision; use datafusion_common::utils::transpose; @@ -166,9 +177,13 @@ struct OutputChannel { } impl OutputChannel { - fn coalesce(&mut self, batch: RecordBatch) -> Result> { + fn coalesce( + &mut self, + relative_partition: usize, + batch: RecordBatch, + ) -> Result> { match &self.shared_coalescer { - Some(shared) => Ok(shared.push_and_drain(batch)?), + Some(shared) => Ok(shared.push_and_drain(relative_partition, batch)?), None => Ok(vec![batch]), } } @@ -224,17 +239,22 @@ impl OutputChannel { /// Cheap to [`Clone`]: both fields are [`Arc`]s. #[derive(Clone)] struct SharedCoalescer { - inner: Arc>, + inner: Arc>, active_senders: Arc, } impl SharedCoalescer { - fn new(schema: SchemaRef, target_batch_size: usize, num_senders: usize) -> Self { + fn new( + schema: &SchemaRef, + target_batch_size: usize, + num_senders: usize, + max_aggr_partition_factor: usize, + ) -> Self { Self { - inner: Arc::new(Mutex::new(LimitedBatchCoalescer::new( + inner: Arc::new(Mutex::new(OutputPartitionCoalescer::new( schema, target_batch_size, - None, + max_aggr_partition_factor, ))), active_senders: Arc::new(AtomicUsize::new(num_senders)), } @@ -242,12 +262,16 @@ impl SharedCoalescer { /// Push `batch` into the coalescer and drain any newly completed /// batches. The mutex is held only briefly. - fn push_and_drain(&self, batch: RecordBatch) -> Result> { + fn push_and_drain( + &self, + relative_partition: usize, + batch: RecordBatch, + ) -> Result> { let mut acc = Vec::new(); - let mut c = self.inner.lock(); - c.push_batch(batch)?; - while let Some(b) = c.next_completed_batch() { - acc.push(b); + let mut coalescer = self.inner.lock(); + coalescer.push_batch(relative_partition, batch)?; + while let Some(batch) = coalescer.next_completed_batch() { + acc.push(batch); } Ok(acc) } @@ -261,15 +285,384 @@ impl SharedCoalescer { return Ok(vec![]); } let mut acc = Vec::new(); - let mut c = self.inner.lock(); - c.finish()?; - while let Some(b) = c.next_completed_batch() { - acc.push(b); + let mut coalescer = self.inner.lock(); + coalescer.finish()?; + while let Some(batch) = coalescer.next_completed_batch() { + acc.push(batch); } Ok(acc) } } +struct OutputPartitionCoalescer { + inner: OutputPartitionCoalescerInner, +} + +enum OutputPartitionCoalescerInner { + Simple(LimitedBatchCoalescer), + HashAggregate(HashAggregateOutputPartitionCoalescer), +} + +impl OutputPartitionCoalescer { + fn new( + schema: &SchemaRef, + target_batch_size: usize, + max_aggr_partition_factor: usize, + ) -> Self { + let inner = if max_aggr_partition_factor <= 1 { + OutputPartitionCoalescerInner::Simple(LimitedBatchCoalescer::new( + Arc::clone(schema), + target_batch_size, + None, + )) + } else { + OutputPartitionCoalescerInner::HashAggregate( + HashAggregateOutputPartitionCoalescer::new( + schema, + target_batch_size, + max_aggr_partition_factor, + ), + ) + }; + + Self { inner } + } + + fn push_batch( + &mut self, + relative_partition: usize, + batch: RecordBatch, + ) -> Result<()> { + match &mut self.inner { + OutputPartitionCoalescerInner::Simple(coalescer) => { + debug_assert_eq!(relative_partition, 0); + coalescer.push_batch(batch)?; + } + OutputPartitionCoalescerInner::HashAggregate(coalescer) => { + debug_assert_eq!(relative_partition, 0); + coalescer.push_partitioned_batch(batch)?; + } + } + Ok(()) + } + + fn finish(&mut self) -> Result<()> { + match &mut self.inner { + OutputPartitionCoalescerInner::Simple(coalescer) => coalescer.finish(), + OutputPartitionCoalescerInner::HashAggregate(coalescer) => coalescer.finish(), + } + } + + fn next_completed_batch(&mut self) -> Option { + match &mut self.inner { + OutputPartitionCoalescerInner::Simple(coalescer) => { + coalescer.next_completed_batch() + } + OutputPartitionCoalescerInner::HashAggregate(coalescer) => { + coalescer.next_completed_batch() + } + } + } +} + +#[derive(Debug)] +struct BufferedPartitionRange { + batch: Arc, + offset: usize, + len: usize, +} + +#[derive(Debug)] +struct IndexedPartitionRange { + batch_idx: usize, + offset: usize, + len: usize, +} + +pub struct HashAggregateOutputPartitionCoalescer { + schema: SchemaRef, + target_batch_size: usize, + buffers: Vec>, + buffered_rows: usize, + completed: VecDeque, + finished: bool, +} + +impl HashAggregateOutputPartitionCoalescer { + pub fn new( + schema: &SchemaRef, + target_batch_size: usize, + max_aggr_partition_factor: usize, + ) -> Self { + Self { + schema: Arc::clone(schema), + target_batch_size: target_batch_size.max(1), + buffers: (0..max_aggr_partition_factor).map(|_| vec![]).collect(), + buffered_rows: 0, + completed: VecDeque::new(), + finished: false, + } + } + + pub fn push_batch( + &mut self, + relative_partition: usize, + batch: RecordBatch, + ) -> Result<()> { + self.push_checked_batch()?; + self.push_batch_ranges(batch, |batch| { + Ok(vec![PartitionRun::new( + relative_partition, + batch.num_rows(), + )?]) + }) + } + + pub fn push_partitioned_batch(&mut self, batch: RecordBatch) -> Result<()> { + self.push_checked_batch()?; + self.push_batch_ranges(batch, |batch| { + partition_runs(batch.schema_ref())?.ok_or_else(|| { + DataFusionError::Internal( + "Hash aggregate partitioned batch missing partition run metadata" + .to_string(), + ) + }) + }) + } + + pub fn finish(&mut self) -> Result<()> { + if self.finished { + return Ok(()); + } + self.flush_buffered()?; + self.finished = true; + Ok(()) + } + + pub fn next_completed_batch(&mut self) -> Option { + self.completed.pop_front() + } + + fn push_checked_batch(&self) -> Result<()> { + if self.finished { + return internal_err!( + "HashAggregateOutputPartitionCoalescer: cannot push batch after finish" + ); + } + Ok(()) + } + + fn push_batch_ranges( + &mut self, + batch: RecordBatch, + runs: impl FnOnce(&RecordBatch) -> Result>, + ) -> Result<()> { + if batch.num_rows() == 0 { + return Ok(()); + } + + let runs = runs(&batch)?; + let batch = Arc::new(batch); + let mut offset = 0; + for run in runs { + if run.relative_partition >= self.buffers.len() { + return internal_err!( + "Hash aggregate repartition produced relative partition {}, expected less than {}", + run.relative_partition, + self.buffers.len() + ); + } + + self.push_range(&batch, run.relative_partition, offset, run.len)?; + offset += run.len; + } + debug_assert_eq!(offset, batch.num_rows()); + Ok(()) + } + + fn push_range( + &mut self, + batch: &Arc, + relative_partition: usize, + mut offset: usize, + mut len: usize, + ) -> Result<()> { + while len > 0 { + let num_free_rows = self.target_batch_size.saturating_sub(self.buffered_rows); + if num_free_rows == 0 { + self.flush_buffered()?; + continue; + } + + let range_len = len.min(num_free_rows); + self.buffer_range(batch, relative_partition, offset, range_len); + if self.buffered_rows == self.target_batch_size { + self.flush_buffered()?; + } + + offset += range_len; + len -= range_len; + } + Ok(()) + } + + fn buffer_range( + &mut self, + batch: &Arc, + relative_partition: usize, + offset: usize, + len: usize, + ) { + debug_assert!(len > 0); + self.buffered_rows += len; + self.buffers[relative_partition].push(BufferedPartitionRange { + batch: Arc::clone(batch), + offset, + len, + }); + } + + fn flush_buffered(&mut self) -> Result<()> { + if self.buffered_rows == 0 { + return Ok(()); + } + + let mut runs = Vec::new(); + let mut output_ranges = Vec::new(); + let partitions = self.buffers.iter_mut().enumerate(); + for (relative_partition, partition_ranges) in partitions { + let partition_rows = partition_ranges + .iter() + .map(|range| range.len) + .sum::(); + if partition_rows == 0 { + continue; + } + + runs.push(PartitionRun::new(relative_partition, partition_rows)?); + output_ranges.append(partition_ranges); + } + + let batch = + concat_batch_ranges(&self.schema, &output_ranges, self.buffered_rows)?; + debug_assert_eq!(batch.num_rows(), self.buffered_rows); + let batch = set_partition_runs_metadata(batch, &runs)?; + self.completed.push_back(batch); + self.buffered_rows = 0; + Ok(()) + } +} + +fn concat_batch_ranges( + schema: &SchemaRef, + ranges: &[BufferedPartitionRange], + num_rows: usize, +) -> Result { + if ranges + .iter() + .all(|range| range.offset == 0 && range.len == range.batch.num_rows()) + { + let batches = ranges + .iter() + .map(|range| range.batch.as_ref()) + .collect::>(); + return Ok(concat_batches(schema, batches)?); + } + + let (batches, ranges) = index_partition_ranges(ranges); + let columns = (0..schema.fields().len()) + .map(|column_idx| concat_column_ranges(&batches, &ranges, column_idx, num_rows)) + .collect::>>()?; + + RecordBatch::try_new(Arc::clone(schema), columns).map_err(Into::into) +} + +fn index_partition_ranges( + ranges: &[BufferedPartitionRange], +) -> (Vec>, Vec) { + let mut batches = Vec::new(); + let mut indexed_ranges = Vec::with_capacity(ranges.len()); + + for range in ranges { + let batch_idx = batches + .iter() + .position(|batch| Arc::ptr_eq(batch, &range.batch)) + .unwrap_or_else(|| { + let batch_idx = batches.len(); + batches.push(Arc::clone(&range.batch)); + batch_idx + }); + indexed_ranges.push(IndexedPartitionRange { + batch_idx, + offset: range.offset, + len: range.len, + }); + } + + (batches, indexed_ranges) +} + +fn concat_column_ranges( + batches: &[Arc], + ranges: &[IndexedPartitionRange], + column_idx: usize, + num_rows: usize, +) -> Result { + let input_arrays = batches + .iter() + .map(|batch| batch.column(column_idx).as_ref()) + .collect::>(); + let input_data = input_arrays + .iter() + .map(|array| array.to_data()) + .collect::>(); + let input_refs = input_data.iter().collect::>(); + let capacity = concat_ranges_capacity(&input_arrays, ranges, num_rows); + let mut output = MutableArrayData::with_capacities(input_refs, false, capacity); + + for range in ranges { + output.extend(range.batch_idx, range.offset, range.offset + range.len); + } + + Ok(make_array(output.freeze())) +} + +fn concat_ranges_capacity( + arrays: &[&dyn Array], + ranges: &[IndexedPartitionRange], + num_rows: usize, +) -> Capacities { + match arrays[0].data_type() { + DataType::Utf8 => concat_bytes_ranges_capacity::(arrays, ranges), + DataType::LargeUtf8 => { + concat_bytes_ranges_capacity::(arrays, ranges) + } + DataType::Binary => concat_bytes_ranges_capacity::(arrays, ranges), + DataType::LargeBinary => { + concat_bytes_ranges_capacity::(arrays, ranges) + } + DataType::Utf8View | DataType::BinaryView => Capacities::Array(num_rows), + _ => Capacities::Array(num_rows), + } +} + +fn concat_bytes_ranges_capacity( + arrays: &[&dyn Array], + ranges: &[IndexedPartitionRange], +) -> Capacities { + let mut item_capacity = 0; + let mut bytes_capacity = 0; + for range in ranges { + let array = arrays[range.batch_idx].as_bytes::(); + let offsets = array.value_offsets(); + item_capacity += range.len; + bytes_capacity += offsets[range.offset + range.len].as_usize() + - offsets[range.offset].as_usize(); + } + + Capacities::Binary(item_capacity, Some(bytes_capacity)) +} + /// Channels and resources for a single output partition. /// /// Each output partition has channels to receive data from all input partitions. @@ -396,6 +789,7 @@ impl RepartitionExecState { name: &str, context: &Arc, spill_manager: SpillManager, + max_aggr_partition_factor: usize, ) -> Result<&mut ConsumingInputStreamsState> { let streams_and_metrics = match self { RepartitionExecState::NotInitialized => { @@ -477,9 +871,10 @@ impl RepartitionExecState { // channel and `StreamingMergeBuilder` handles batching. let shared_coalescer = (!preserve_order).then(|| { SharedCoalescer::new( - input.schema(), + &input.schema(), context.session_config().batch_size(), num_input_partitions, + max_aggr_partition_factor, ) }); @@ -534,6 +929,7 @@ impl RepartitionExecState { // preserve_order depends on partition index to start from 0 if preserve_order { 0 } else { i }, num_input_partitions, + max_aggr_partition_factor, )); // In a separate task, wait for each input to be done @@ -566,12 +962,59 @@ enum BatchPartitionerState { hash_buffer: Vec, indices: Vec>, }, + HashAggregate(HashAggregateBatchPartitioner), RoundRobin { num_partitions: usize, next_idx: usize, }, } +/// Partitions partial aggregate output into aggregate subpartitions. +struct HashAggregateBatchPartitioner { + exprs: Vec>, + partition_reducer: StrengthReducedU64, + hash_buffer: Vec, + indices: Vec>, + num_partitions: usize, + max_aggr_partition_factor: usize, +} + +impl HashAggregateBatchPartitioner { + fn new( + exprs: Vec>, + num_partitions: usize, + max_aggr_partition_factor: usize, + ) -> Result { + if num_partitions == 0 { + return internal_err!( + "Hash aggregate repartition requires at least one partition" + ); + } + if max_aggr_partition_factor == 0 { + return internal_err!( + "Hash aggregate repartition requires at least one aggregate partition per output partition" + ); + } + + let max_aggr_partitions = num_partitions + .checked_mul(max_aggr_partition_factor) + .ok_or_else(|| { + DataFusionError::Internal( + "Hash aggregate repartition partition count overflow".to_string(), + ) + })?; + + Ok(Self { + exprs, + partition_reducer: StrengthReducedU64::new(max_aggr_partitions as u64), + hash_buffer: vec![], + indices: vec![vec![]; max_aggr_partitions], + num_partitions, + max_aggr_partition_factor, + }) + } +} + /// Fixed RandomState used for hash repartitioning to ensure consistent behavior across /// executions and runs. pub const REPARTITION_RANDOM_STATE: SeededRandomState = SeededRandomState::with_seed(0); @@ -681,6 +1124,29 @@ impl BatchPartitioner { }) } + /// Create a new [`BatchPartitioner`] for hash aggregation repartitioning. + /// + /// Rows are first split into `num_partitions * max_aggr_partition_factor` + /// aggregate partitions. Each group of `max_aggr_partition_factor` + /// aggregate partitions maps back to one output partition. + pub fn new_hash_aggregate_partitioner( + exprs: Vec>, + num_partitions: usize, + max_aggr_partition_factor: usize, + timer: metrics::Time, + ) -> Result { + Ok(Self { + state: BatchPartitionerState::HashAggregate( + HashAggregateBatchPartitioner::new( + exprs, + num_partitions, + max_aggr_partition_factor, + )?, + ), + timer, + }) + } + /// Create a new [`BatchPartitioner`] for round-robin repartitioning. /// /// # Parameters @@ -788,7 +1254,17 @@ impl BatchPartitioner { &mut self, batch: RecordBatch, ) -> Result> + Send + '_> { - let it: Box> + Send> = + Ok(self + .partition_iter_with_relative_partition(batch)? + .map(|res| res.map(|(partition, _, batch)| (partition, batch)))) + } + + fn partition_iter_with_relative_partition( + &mut self, + batch: RecordBatch, + ) -> Result> + Send + '_> + { + let it: Box> + Send> = match &mut self.state { BatchPartitionerState::RoundRobin { num_partitions, @@ -796,7 +1272,7 @@ impl BatchPartitioner { } => { let idx = *next_idx; *next_idx = (*next_idx + 1) % *num_partitions; - Box::new(std::iter::once(Ok((idx, batch)))) + Box::new(std::iter::once(Ok((idx, 0, batch)))) } BatchPartitionerState::Hash { exprs, @@ -804,30 +1280,28 @@ impl BatchPartitioner { hash_buffer, indices, } => { - // Tracking time required for distributing indexes across output partitions - let timer = self.timer.timer(); - - let arrays = - evaluate_expressions_to_arrays(exprs.as_slice(), &batch)?; - - hash_buffer.clear(); - hash_buffer.resize(batch.num_rows(), 0); - - create_hashes( - &arrays, - REPARTITION_RANDOM_STATE.random_state(), + let partitioned_batches = Self::partition_hash_batch( + &batch, + exprs, + *partition_reducer, hash_buffer, + indices, + &self.timer, + |partition| (partition, 0), )?; - indices.iter_mut().for_each(|v| v.clear()); - - partition_reducer.partition_indices(hash_buffer, indices); - - // Finished building index-arrays for output partitions - timer.done(); - - let partitioned_batches = - Self::partition_grouped_take(&batch, indices, &self.timer)?; + Box::new(partitioned_batches.into_iter()) + } + BatchPartitionerState::HashAggregate(partitioner) => { + let partitioned_batches = Self::partition_hash_aggregate_batch( + &batch, + &partitioner.exprs, + partitioner.partition_reducer, + &mut partitioner.hash_buffer, + &mut partitioner.indices, + &self.timer, + partitioner.max_aggr_partition_factor, + )?; Box::new(partitioned_batches.into_iter()) } @@ -841,6 +1315,9 @@ impl BatchPartitioner { match &self.state { BatchPartitionerState::RoundRobin { num_partitions, .. } => *num_partitions, BatchPartitionerState::Hash { indices, .. } => indices.len(), + BatchPartitionerState::HashAggregate(partitioner) => { + partitioner.num_partitions + } } } @@ -860,11 +1337,158 @@ impl BatchPartitioner { /// /// this method takes rows in `[2, 5, 0, 3, 4]` order once, then returns /// `partition 0 = slice(0, 2)` and `partition 2 = slice(2, 3)`. + fn partition_hash_batch( + batch: &RecordBatch, + exprs: &[Arc], + partition_reducer: StrengthReducedU64, + hash_buffer: &mut Vec, + indices: &mut [Vec], + timer: &metrics::Time, + mut map_partition: impl FnMut(usize) -> (usize, usize), + ) -> Result>> { + let partition_timer = timer.timer(); + + let arrays = evaluate_expressions_to_arrays(exprs, batch)?; + + hash_buffer.clear(); + hash_buffer.resize(batch.num_rows(), 0); + + create_hashes( + &arrays, + REPARTITION_RANDOM_STATE.random_state(), + hash_buffer, + )?; + + indices.iter_mut().for_each(|values| values.clear()); + + partition_reducer.partition_indices(hash_buffer, indices); + + partition_timer.done(); + + Self::partition_grouped_take(batch, indices, timer, |partition| { + map_partition(partition) + }) + } + + fn partition_hash_aggregate_batch( + batch: &RecordBatch, + exprs: &[Arc], + partition_reducer: StrengthReducedU64, + hash_buffer: &mut Vec, + indices: &mut [Vec], + timer: &metrics::Time, + max_aggr_partition_factor: usize, + ) -> Result>> { + let partition_timer = timer.timer(); + + let arrays = evaluate_expressions_to_arrays(exprs, batch)?; + + hash_buffer.clear(); + hash_buffer.resize(batch.num_rows(), 0); + + create_hashes( + &arrays, + REPARTITION_RANDOM_STATE.random_state(), + hash_buffer, + )?; + + indices.iter_mut().for_each(|values| values.clear()); + + partition_reducer.partition_indices(hash_buffer, indices); + + partition_timer.done(); + + Self::partition_hash_aggregate_grouped_take( + batch, + indices, + timer, + max_aggr_partition_factor, + ) + } + + fn partition_hash_aggregate_grouped_take( + batch: &RecordBatch, + indices: &mut [Vec], + timer: &metrics::Time, + max_aggr_partition_factor: usize, + ) -> Result>> { + let mut output_ranges = Vec::with_capacity(indices.len()); + let mut reordered_indices = Vec::with_capacity(batch.num_rows()); + + for (aggr_partition, p_indices) in indices.iter_mut().enumerate() { + if p_indices.is_empty() { + continue; + } + + let start = reordered_indices.len(); + reordered_indices.extend_from_slice(p_indices); + output_ranges.push(( + aggr_partition / max_aggr_partition_factor, + PartitionRun::new( + aggr_partition % max_aggr_partition_factor, + p_indices.len(), + )?, + start, + )); + p_indices.clear(); + } + + if reordered_indices.is_empty() { + return Ok(vec![]); + } + + let batches = { + let _timer = timer.timer(); + let indices_array: PrimitiveArray = reordered_indices.into(); + let columns = take_arrays(batch.columns(), &indices_array, None)?; + + let mut options = RecordBatchOptions::new(); + options = options.with_row_count(Some(indices_array.len())); + let reordered_batch = + RecordBatch::try_new_with_options(batch.schema(), columns, &options)?; + + let mut output_batches = Vec::new(); + let mut current_output_partition = None; + let mut current_start = 0; + let mut current_len = 0; + let mut current_runs = Vec::new(); + + for (output_partition, run, start) in output_ranges { + if current_output_partition != Some(output_partition) { + if let Some(partition) = current_output_partition { + let batch = reordered_batch.slice(current_start, current_len); + let batch = set_partition_runs_metadata(batch, ¤t_runs)?; + output_batches.push(Ok((partition, 0, batch))); + } + + current_output_partition = Some(output_partition); + current_start = start; + current_len = 0; + current_runs.clear(); + } + + current_len += run.len; + current_runs.push(run); + } + + if let Some(partition) = current_output_partition { + let batch = reordered_batch.slice(current_start, current_len); + let batch = set_partition_runs_metadata(batch, ¤t_runs)?; + output_batches.push(Ok((partition, 0, batch))); + } + + output_batches + }; + + Ok(batches) + } + fn partition_grouped_take( batch: &RecordBatch, indices: &mut [Vec], timer: &metrics::Time, - ) -> Result>> { + mut map_partition: impl FnMut(usize) -> (usize, usize), + ) -> Result>> { let mut partition_ranges = Vec::with_capacity(indices.len()); let mut reordered_indices = Vec::with_capacity(batch.num_rows()); @@ -875,7 +1499,13 @@ impl BatchPartitioner { let start = reordered_indices.len(); reordered_indices.extend_from_slice(p_indices); - partition_ranges.push((partition, start, p_indices.len())); + let (output_partition, relative_partition) = map_partition(partition); + partition_ranges.push(( + output_partition, + relative_partition, + start, + p_indices.len(), + )); p_indices.clear(); } @@ -895,8 +1525,12 @@ impl BatchPartitioner { partition_ranges .into_iter() - .map(|(partition, start, len)| { - Ok((partition, reordered_batch.slice(start, len))) + .map(|(partition, relative_partition, start, len)| { + Ok(( + partition, + relative_partition, + reordered_batch.slice(start, len), + )) }) .collect() }; @@ -1047,6 +1681,8 @@ pub struct RepartitionExec { /// Boolean flag to decide whether to preserve ordering. If true means /// `SortPreservingRepartitionExec`, false means `RepartitionExec`. preserve_order: bool, + /// Number of aggregate partitions mapped to one output partition. + max_aggr_partition_factor: usize, /// Cache holding plan properties like equivalences, output partitioning etc. cache: Arc, } @@ -1113,6 +1749,11 @@ impl RepartitionExec { self.preserve_order } + /// Return the number of aggregate partitions per output partition. + pub fn max_aggr_partition_factor(&self) -> usize { + self.max_aggr_partition_factor + } + /// Get name used to display this Exec pub fn name(&self) -> &str { "RepartitionExec" @@ -1146,6 +1787,13 @@ impl DisplayAs for RepartitionExec { if self.preserve_order { write!(f, ", preserve_order=true")?; + } + if self.max_aggr_partition_factor > 1 { + write!( + f, + ", max_aggr_partition_factor={}", + self.max_aggr_partition_factor + )?; } else if input_partition_count <= 1 && self.input.output_ordering().is_some() { @@ -1172,6 +1820,13 @@ impl DisplayAs for RepartitionExec { if self.preserve_order { writeln!(f, "preserve_order={}", self.preserve_order)?; } + if self.max_aggr_partition_factor > 1 { + writeln!( + f, + "max_aggr_partition_factor={}", + self.max_aggr_partition_factor + )?; + } Ok(()) } } @@ -1201,6 +1856,7 @@ impl ExecutionPlan for RepartitionExec { children.swap_remove(0), self.partitioning().clone(), )?; + repartition.max_aggr_partition_factor = self.max_aggr_partition_factor; if self.preserve_order { repartition = repartition.with_preserve_order(); } @@ -1232,6 +1888,7 @@ impl ExecutionPlan for RepartitionExec { let partitioning = self.partitioning().clone(); let metrics = self.metrics.clone(); let preserve_order = self.sort_exprs().is_some(); + let max_aggr_partition_factor = self.max_aggr_partition_factor; let name = self.name().to_owned(); let schema = self.schema(); let schema_captured = Arc::clone(&schema); @@ -1270,6 +1927,7 @@ impl ExecutionPlan for RepartitionExec { &name, &context, spill_manager.clone(), + max_aggr_partition_factor, )?; // now return stream for the specified *output* partition which will @@ -1530,6 +2188,7 @@ impl ExecutionPlan for RepartitionExec { state: Arc::clone(&self.state), metrics: self.metrics.clone(), preserve_order: self.preserve_order, + max_aggr_partition_factor: self.max_aggr_partition_factor, cache: new_properties.into(), }))) } @@ -1543,6 +2202,28 @@ impl RepartitionExec { input: Arc, partitioning: Partitioning, ) -> Result { + Self::try_new_with_max_aggr_partition_factor(input, partitioning, 1) + } + + /// Create a repartition operator with aggregate subpartitioning enabled. + pub fn try_new_with_max_aggr_partition_factor( + input: Arc, + partitioning: Partitioning, + max_aggr_partition_factor: usize, + ) -> Result { + if max_aggr_partition_factor == 0 { + return internal_err!( + "RepartitionExec requires max_aggr_partition_factor to be at least one" + ); + } + if max_aggr_partition_factor > 1 + && !matches!(partitioning, Partitioning::Hash(_, _)) + { + return internal_err!( + "Hash aggregate repartition can only be used with hash partitioning" + ); + } + let preserve_order = false; let cache = Self::compute_properties(&input, partitioning, preserve_order); Ok(RepartitionExec { @@ -1550,6 +2231,7 @@ impl RepartitionExec { state: Default::default(), metrics: ExecutionPlanMetricsSet::new(), preserve_order, + max_aggr_partition_factor, cache: Arc::new(cache), }) } @@ -1635,14 +2317,24 @@ impl RepartitionExec { metrics: RepartitionMetrics, input_partition: usize, num_input_partitions: usize, + max_aggr_partition_factor: usize, ) -> Result<()> { let mut partitioner = match &partitioning { Partitioning::Hash(exprs, num_partitions) => { - BatchPartitioner::new_hash_partitioner( - exprs.clone(), - *num_partitions, - metrics.repartition_time.clone(), - )? + if max_aggr_partition_factor > 1 { + BatchPartitioner::new_hash_aggregate_partitioner( + exprs.clone(), + *num_partitions, + max_aggr_partition_factor, + metrics.repartition_time.clone(), + )? + } else { + BatchPartitioner::new_hash_partitioner( + exprs.clone(), + *num_partitions, + metrics.repartition_time.clone(), + )? + } } Partitioning::RoundRobinBatch(num_partitions) => { BatchPartitioner::new_round_robin_partitioner( @@ -1683,13 +2375,13 @@ impl RepartitionExec { continue; } - for res in partitioner.partition_iter(batch)? { - let (partition, batch) = res?; + for res in partitioner.partition_iter_with_relative_partition(batch)? { + let (partition, relative_partition, batch) = res?; let timer = metrics.send_time[partition].timer(); // if there is still a receiver, send to it if let Some(output_channel) = output_channels.get_mut(&partition) { - for batch in output_channel.coalesce(batch)? { + for batch in output_channel.coalesce(relative_partition, batch)? { if output_channel.send(batch).await.is_err() { // If the other end has hung up, it was an early shutdown (e.g. LIMIT) // so ignore this channel from now on. @@ -2020,6 +2712,119 @@ mod tests { use datafusion_physical_expr::{PhysicalSortExpr, RangePartitioning, SplitPoint}; use insta::assert_snapshot; + // Covers hash aggregate partitioning mapping multiple aggregate partitions + // back to one output partition. + // Example: with 2 output partitions and factor 4, each non-empty relative + // partition is in 0..4 and the output partition is in 0..2. + #[test] + fn test_hash_aggregate_partitioner_maps_subpartitions_to_output_partitions() + -> Result<()> { + let schema = + Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)])); + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(UInt32Array::from_iter_values(0..128))], + )?; + let timer = metrics::Time::default(); + let mut partitioner = BatchPartitioner::new_hash_aggregate_partitioner( + vec![col("c0", &schema)?], + 2, + 4, + timer, + )?; + + let mut seen = vec![vec![false; 4]; 2]; + let mut num_rows = 0; + for result in partitioner.partition_iter_with_relative_partition(batch)? { + let (partition, relative_partition, batch) = result?; + assert!(partition < 2); + assert_eq!(relative_partition, 0); + num_rows += batch.num_rows(); + + let runs = partition_runs(batch.schema_ref())?.unwrap(); + for run in runs { + assert!(run.relative_partition < 4); + seen[partition][run.relative_partition] = true; + } + } + + assert_eq!(num_rows, 128); + assert!(seen.iter().flatten().all(|seen| *seen)); + Ok(()) + } + + // Covers hash aggregate output coalescing ordering rows by relative + // aggregate partition and encoding partition runs in batch metadata. + // Example: pushing p1 then p0 still outputs rows as p0, then p1. + #[test] + fn test_hash_aggregate_output_coalescer_orders_partition_runs() -> Result<()> { + let schema = test_schema(); + let mut coalescer = HashAggregateOutputPartitionCoalescer::new(&schema, 4, 3); + + coalescer.push_batch(1, uint32_batch(&schema, &[10, 11]))?; + assert!(coalescer.next_completed_batch().is_none()); + coalescer.push_batch(0, uint32_batch(&schema, &[1, 2]))?; + + let output = coalescer.next_completed_batch().unwrap(); + assert_eq!(uint32_values(&output), vec![1, 2, 10, 11]); + assert_eq!( + output + .schema_ref() + .metadata() + .get("datafusion.internal.hash_aggr_partition_runs"), + Some(&"0:2,1:2".to_string()) + ); + assert!(coalescer.next_completed_batch().is_none()); + + Ok(()) + } + + // Covers slicing an incoming batch repeatedly when the remaining rows still + // exceed target_batch_size. + // Example: target 4 and one 10-row input yields 4, 4, then residual 2. + #[test] + fn test_hash_aggregate_output_coalescer_splits_large_batch() -> Result<()> { + let schema = test_schema(); + let mut coalescer = HashAggregateOutputPartitionCoalescer::new(&schema, 4, 2); + + coalescer + .push_batch(1, uint32_batch(&schema, &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]))?; + + let first = coalescer.next_completed_batch().unwrap(); + let second = coalescer.next_completed_batch().unwrap(); + assert!(coalescer.next_completed_batch().is_none()); + + coalescer.finish()?; + let third = coalescer.next_completed_batch().unwrap(); + + assert_eq!(uint32_values(&first), vec![0, 1, 2, 3]); + assert_eq!(uint32_values(&second), vec![4, 5, 6, 7]); + assert_eq!(uint32_values(&third), vec![8, 9]); + assert_eq!( + first + .schema_ref() + .metadata() + .get("datafusion.internal.hash_aggr_partition_runs"), + Some(&"1:4".to_string()) + ); + assert_eq!( + second + .schema_ref() + .metadata() + .get("datafusion.internal.hash_aggr_partition_runs"), + Some(&"1:4".to_string()) + ); + assert_eq!( + third + .schema_ref() + .metadata() + .get("datafusion.internal.hash_aggr_partition_runs"), + Some(&"1:2".to_string()) + ); + + Ok(()) + } + #[test] fn strength_reduced_u64_remainder_matches_modulo() { let divisors = [ @@ -2228,6 +3033,24 @@ mod tests { Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)])) } + fn uint32_batch(schema: &SchemaRef, values: &[u32]) -> RecordBatch { + RecordBatch::try_new( + Arc::clone(schema), + vec![Arc::new(UInt32Array::from(values.to_vec()))], + ) + .unwrap() + } + + fn uint32_values(batch: &RecordBatch) -> Vec { + batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap() + .values() + .to_vec() + } + async fn repartition( schema: &SchemaRef, input_partitions: Vec>,