From 3c431a58641cb3f5a3ac49faf289d400156012c0 Mon Sep 17 00:00:00 2001 From: Emily Matheys Date: Thu, 2 Jul 2026 11:00:51 +0300 Subject: [PATCH 1/2] fix: Batch size limit in re-spill compounds --- .../physical-plan/src/aggregates/row_hash.rs | 3 + .../src/sorts/multi_level_merge.rs | 341 ++++++++++++++---- datafusion/physical-plan/src/sorts/sort.rs | 3 + .../src/sorts/streaming_merge.rs | 19 +- 4 files changed, 292 insertions(+), 74 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index a4d19b0f7d18..f2c43fd9c7ce 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -1221,6 +1221,9 @@ impl GroupedHashAggregateStream { self.spill_state.spills.push(SortedSpillFile { file: spillfile, max_record_batch_memory, + // Spilled aggregate runs use the normal batch size, so they + // impose no cap on later merge output batch sizes. + batch_size_limit: None, }) } None => { diff --git a/datafusion/physical-plan/src/sorts/multi_level_merge.rs b/datafusion/physical-plan/src/sorts/multi_level_merge.rs index e52a6edb82fd..27134073ddab 100644 --- a/datafusion/physical-plan/src/sorts/multi_level_merge.rs +++ b/datafusion/physical-plan/src/sorts/multi_level_merge.rs @@ -20,6 +20,7 @@ use crate::metrics::BaselineMetrics; use crate::{EmptyRecordBatchStream, SpillManager}; use arrow::array::RecordBatch; +use std::collections::VecDeque; use std::fmt::{Debug, Formatter}; use std::mem; use std::pin::Pin; @@ -131,14 +132,24 @@ use futures::{Stream, StreamExt}; /// reserve memory for the minimum of 2 streams - because a single run's largest batch is so /// wide that two streams' worth of reservation exceeds the budget - the larger of the two /// runs is re-spilled with each batch sliced in half. This shrinks its largest batch, -/// lowering the per-stream reservation, and the merge pass is retried. The merge output -/// batch size is halved as well so the merged run cannot rebuild a full-size batch and -/// reintroduce the skew. If a batch cannot be split any further (a single row wider than the -/// budget), the merge surfaces `ResourcesExhausted` instead of looping forever. +/// lowering the per-stream reservation, and the merge pass is retried. The re-spilled run +/// records a per-run [`batch_size_limit`](SortedSpillFile::batch_size_limit) equal to half +/// the batch size it was written with, so any later merge that includes it caps its output +/// batch size to match - otherwise the merged run could rebuild a full-size batch and +/// reintroduce the skew. Crucially the global merge batch size is *not* lowered: the shrunk +/// run is pushed to the back of the queue, so passes that don't touch it keep merging at the +/// full batch size, and only the final passes that consume the shrunk run pay the reduced +/// batch size. If a batch cannot be split any further (a single row wider than the budget), +/// the merge surfaces `ResourcesExhausted` instead of looping forever. pub(crate) struct MultiLevelMergeBuilder { spill_manager: SpillManager, schema: SchemaRef, - sorted_spill_files: Vec, + /// Sorted runs still to be merged, held as a queue: passes consume runs from the + /// front, and runs re-spilled smaller to resolve skew are pushed to the back (see + /// [`Self::split_spill_file_in_half`]) so the deferred, reduced-batch-size merges + /// happen last. The k-way merge is order-independent, so the ordering is purely a + /// heuristic and never affects correctness. + sorted_spill_files: VecDeque, sorted_streams: Vec, expr: LexOrdering, metrics: BaselineMetrics, @@ -171,7 +182,7 @@ impl MultiLevelMergeBuilder { Self { spill_manager, schema, - sorted_spill_files, + sorted_spill_files: sorted_spill_files.into(), sorted_streams, expr, metrics, @@ -191,17 +202,22 @@ impl MultiLevelMergeBuilder { async fn create_stream(mut self) -> Result { loop { - let mut stream = match self.merge_sorted_runs_within_mem_limit()? { - MergeStep::Stream(stream) => stream, - MergeStep::SplitThenRetry(index) => { - // Couldn't reserve memory for the minimum of 2 streams. Re-spill the - // larger of the two we're trying to merge with half its batch size so - // its largest batch shrinks, lowering the per-stream reservation, then - // retry. Makes the merge resilient to skewed (very wide) rows. - self.split_spill_file_in_half(index).await?; - continue; - } - }; + let (mut stream, batch_size_limit) = + match self.merge_sorted_runs_within_mem_limit()? { + MergeStep::Stream { + stream, + batch_size_limit, + } => (stream, batch_size_limit), + MergeStep::SplitThenRetry(index) => { + // Couldn't reserve memory for the minimum of 2 streams. Re-spill + // the larger of the two we're trying to merge with half its batch + // size so its largest batch shrinks, lowering the per-stream + // reservation, then retry. Makes the merge resilient to skewed + // (very wide) rows. + self.split_spill_file_in_half(index).await?; + continue; + } + }; // TODO - add a threshold for number of files to disk even if empty and reading from disk so // we can avoid the memory reservation @@ -229,10 +245,14 @@ impl MultiLevelMergeBuilder { continue; }; - // Add the spill file - self.sorted_spill_files.push(SortedSpillFile { + // Add the spill file. It inherits the batch-size limit of the merge that + // produced it: if that merge consumed a shrunk (skew-resolved) run, its + // output was capped and this intermediate run is likewise capped, so a + // later pass that re-merges it won't rebuild an oversized batch. + self.sorted_spill_files.push_back(SortedSpillFile { file: spill_file, max_record_batch_memory, + batch_size_limit, }); } } @@ -242,33 +262,48 @@ impl MultiLevelMergeBuilder { fn merge_sorted_runs_within_mem_limit(&mut self) -> Result { match (self.sorted_spill_files.len(), self.sorted_streams.len()) { // No data so empty batch - (0, 0) => Ok(MergeStep::Stream(Box::pin(EmptyRecordBatchStream::new( - Arc::clone(&self.schema), - )))), + (0, 0) => Ok(MergeStep::Stream { + stream: Box::pin(EmptyRecordBatchStream::new(Arc::clone(&self.schema))), + batch_size_limit: None, + }), // Only in-memory stream, return that - (0, 1) => Ok(MergeStep::Stream(self.sorted_streams.remove(0))), + (0, 1) => Ok(MergeStep::Stream { + stream: self.sorted_streams.remove(0), + batch_size_limit: None, + }), // Only single sorted spill file so return it (1, 0) => { - let spill_file = self.sorted_spill_files.remove(0); + let spill_file = self + .sorted_spill_files + .pop_front() + .expect("matched exactly one spill file"); // Not reserving any memory for this disk as we are not holding it in memory - Ok(MergeStep::Stream( - self.spill_manager + Ok(MergeStep::Stream { + stream: self + .spill_manager .read_spill_as_stream(spill_file.file, None)?, - )) + batch_size_limit: None, + }) } - // Only in memory streams, so merge them all in a single pass + // Only in memory streams, so merge them all in a single pass. In-memory + // runs are never shrunk for skew, so this merge runs at the full batch + // size and its output carries no limit. (0, _) => { let sorted_stream = mem::take(&mut self.sorted_streams); - Ok(MergeStep::Stream(self.create_new_merge_sort( - sorted_stream, - // If we have no sorted spill files left, this is the last run - true, - true, - )?)) + Ok(MergeStep::Stream { + stream: self.create_new_merge_sort( + sorted_stream, + // If we have no sorted spill files left, this is the last run + true, + true, + self.batch_size, + )?, + batch_size_limit: None, + }) } // Need to merge multiple streams @@ -304,6 +339,14 @@ impl MultiLevelMergeBuilder { } }; + // Cap the merge output at the smallest limit among the runs we're + // about to merge. Only runs that were shrunk for skew carry a limit; + // if none do, the merge runs at the full `self.batch_size`. The output + // is tagged with the same limit so a re-spilled intermediate run stays + // shrunk and won't rebuild an oversized batch on a later pass. + let batch_size_limit = min_batch_size_limit(&sorted_spill_files); + let output_batch_size = batch_size_limit.unwrap_or(self.batch_size); + // Don't account for existing streams memory // as we are not holding the memory for them let mut sorted_streams = mem::take(&mut self.sorted_streams); @@ -335,6 +378,7 @@ impl MultiLevelMergeBuilder { // If we have no sorted spill files left, this is the last run self.sorted_spill_files.is_empty(), is_only_merging_memory_streams, + output_batch_size, )?; // If we're only merging memory streams, we don't need to attach the memory reservation @@ -346,14 +390,20 @@ impl MultiLevelMergeBuilder { "when only merging memory streams, we should not have any memory reservation and let the merge sort handle the memory" ); - Ok(MergeStep::Stream(merge_sort_stream)) + Ok(MergeStep::Stream { + stream: merge_sort_stream, + batch_size_limit, + }) } else { // Attach the memory reservation to the stream to make sure we have enough memory // throughout the merge process as we bypassed the memory pool for the merge sort stream - Ok(MergeStep::Stream(Box::pin(StreamAttachedReservation::new( - merge_sort_stream, - memory_reservation, - )))) + Ok(MergeStep::Stream { + stream: Box::pin(StreamAttachedReservation::new( + merge_sort_stream, + memory_reservation, + )), + batch_size_limit, + }) } } } @@ -364,11 +414,12 @@ impl MultiLevelMergeBuilder { streams: Vec, is_output: bool, all_in_memory: bool, + output_batch_size: usize, ) -> Result { let mut builder = StreamingMergeBuilder::new() .with_schema(Arc::clone(&self.schema)) .with_expressions(&self.expr) - .with_batch_size(self.batch_size) + .with_batch_size(output_batch_size) .with_fetch(self.fetch) .with_metrics(if is_output { // Only add the metrics to the last run @@ -482,32 +533,47 @@ impl MultiLevelMergeBuilder { Ok(SpillFilesToMerge::Ready(spills, buffer_len)) } - /// Re-spill the spill file at `index` with half its batch size, putting it back - /// at the same position. We read the file back and re-spill it through the normal - /// spill API (which owns batch layout). - /// Slicing each batch in two halves the largest written batch, - /// which lowers the per-stream merge reservation so the - /// next attempt can seat both streams. One stream's worth of memory is reserved - /// for the duration and freed afterwards. Makes the merge resilient to skew. + /// Re-spill the spill file at `index` with half its batch size to resolve skew. + /// We read the file back and re-spill it through the normal spill API (which owns + /// batch layout), slicing every batch in two, which halves the largest written + /// batch and so lowers the per-stream merge reservation enough for the next + /// attempt to seat both streams. One stream's worth of memory is reserved for the + /// duration and freed afterwards. + /// + /// Two things make this cheap for the overall sort: + /// - Instead of halving the *global* merge batch size (which would slow every + /// remaining pass), the shrunk run records its own + /// [`batch_size_limit`](SortedSpillFile::batch_size_limit); only merges that + /// actually consume it pay the reduced batch size. + /// - The shrunk run is pushed to the *back* of the queue rather than kept at + /// `index`. Passes drain from the front, so the bulk of the sort merges the + /// unconstrained runs at the full batch size and the shrunk run is folded in + /// near the end. The k-way merge is order-independent, so reordering the runs + /// does not affect the result. async fn split_spill_file_in_half(&mut self, index: usize) -> Result<()> { log::debug!( "2 spilled streams could not be loaded into memory for merge \ (requires 2x of the largest batch from both), re-spilling the larger of the two with half \ - the batch size to reduce memory needs for the next merge attempt, \ - setting batch_size to half to proceed with merge" + the batch size to reduce memory needs for the next merge attempt. the shrunk run carries \ + a halved batch-size limit so only merges consuming it use the smaller batch size" ); - // Extract the target in O(1) instead of `remove(index)`, which would shift - // every following spill file. Swap it to the back and pop it; the matching - // swap after re-spilling restores the original order, so the vec ends up - // exactly as it started, just with the target file shrunk. - let last = self.sorted_spill_files.len() - 1; - self.sorted_spill_files.swap(index, last); + // `index` is always 0 or 1 - it identifies the larger of the two runs at the + // front that we failed to seat together (see `get_sorted_spill_files_to_merge`). + debug_assert!(index <= 1, "split index must be one of the two front runs"); + // Extract the target in O(1). `swap_remove_front` fills the gap with the *front* + // element (not the back, as `Vec::swap_remove`/`swap_remove_back` would), so any + // already-deferred shrunk runs sitting at the back stay there. We then push this + // freshly shrunk run to the back so it too is merged after the unconstrained runs. let target = self .sorted_spill_files - .pop() - .expect("index is in bounds, so the vec is non-empty"); + .swap_remove_front(index) + .expect("split index is one of the two front runs, so it is in bounds"); let old_max = target.max_record_batch_memory; + // The batch size this run was written with (the full merge batch size unless + // it was already shrunk once). Halving it caps the next merge that reads this + // run so the merged output can't rebuild a full-size batch. + let old_batch_size = target.batch_size_limit.unwrap_or(self.batch_size); // Reserve enough to hold a single stream of this file while we re-spill it. let reservation = self.reservation.new_empty(); @@ -557,20 +623,19 @@ impl MultiLevelMergeBuilder { ); } - // Also halve the merge output batch size so the next merge pass emits - // narrower batches. Otherwise the merged stream would rebuild a full-size - // (potentially giant) batch and, when spilled back as an intermediate run, - // reintroduce the exact skew we just resolved. - self.batch_size = (self.batch_size / 2).max(1); + // Record the halved batch size as a *per-run* limit rather than lowering the + // global batch size. Merges that don't touch this run keep the full batch + // size. a merge that reads it caps its output at this limit so the merged run + // can't rebuild a full-size batch and reintroduce the skew. + let new_batch_size_limit = (old_batch_size / 2).max(1); - // Push the re-spilled (smaller) file and swap it back into `index`, undoing - // the swap-to-back above so the order is preserved. - self.sorted_spill_files.push(SortedSpillFile { + // Push the re-spilled (smaller) run to the back so it is merged after the + // unconstrained runs still ahead of it in the queue. + self.sorted_spill_files.push_back(SortedSpillFile { file, max_record_batch_memory: new_max, + batch_size_limit: Some(new_batch_size_limit), }); - let last = self.sorted_spill_files.len() - 1; - self.sorted_spill_files.swap(index, last); Ok(()) } @@ -588,11 +653,25 @@ enum SpillFilesToMerge { /// What one iteration of the multi-level merge loop should do next. enum MergeStep { /// A merged stream is ready to be consumed (and possibly spilled back). - Stream(SendableRecordBatchStream), + Stream { + stream: SendableRecordBatchStream, + /// The batch-size limit to stamp on the run if this stream is re-spilled as + /// an intermediate result. `Some(n)` when the merge that produced this stream + /// consumed a skew-resolved run (so its output was capped at `n` rows); + /// `None` when it ran at the full batch size. + batch_size_limit: Option, + }, /// Re-spill the spill file at this index smaller, then retry the merge step. SplitThenRetry(usize), } +/// Smallest [`SortedSpillFile::batch_size_limit`] among `files`, or `None` if none of +/// them is constrained. A merge over `files` must cap its output at this value so a +/// re-spilled result can't rebuild a batch wider than the most-constrained input. +fn min_batch_size_limit(files: &[SortedSpillFile]) -> Option { + files.iter().filter_map(|f| f.batch_size_limit).min() +} + /// Slice `batch` into two row-halves so a re-spill writes batches half the size. fn split_batch_in_half(batch: RecordBatch) -> Vec { let num_rows = batch.num_rows(); @@ -706,6 +785,7 @@ mod tests { SortedSpillFile { file, max_record_batch_memory, + batch_size_limit: None, } } @@ -847,8 +927,10 @@ mod tests { let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); assert_eq!(total_rows, (2 * n) as usize); - // The largest emitted batch is the halved size, not the original 8192 — - // without halving `self.batch_size` the merge would rebuild 8192-row batches. + // The largest emitted batch is the halved size, not the original 8192: the + // shrunk run carries a halved `batch_size_limit`, and the final pass consumes + // it, so the merge output is capped there. Without the per-run limit the merge + // would rebuild 8192-row batches. let expected_batch_size = initial_batch_size / 2; let max_batch_rows = batches.iter().map(|b| b.num_rows()).max().unwrap_or(0); assert_eq!( @@ -859,4 +941,123 @@ mod tests { Ok(()) } + + /// Splitting two *different* runs must not compound the batch-size reduction. + /// Each split halves only that run's own limit and leaves the global + /// `self.batch_size` untouched, so splitting two runs once each yields two runs + /// limited to half the original size - not a quarter, as a global-halving + /// implementation would produce. + #[tokio::test] + async fn splitting_two_runs_does_not_compound_the_batch_size_limit() -> Result<()> { + let env = Arc::new(RuntimeEnv::default()); + let schema = test_schema(); + let spill_manager = build_spill_manager(&env, &schema); + + let n: i64 = 16384; + let f0 = make_sorted_spill_file(&spill_manager, &schema, (0..n).collect()); + let f1 = make_sorted_spill_file(&spill_manager, &schema, (0..n).collect()); + + // Ample budget: every split's own single-stream reservation succeeds, so the + // only thing under test is how the batch-size limit is tracked. + let initial_batch_size = 8192; + let pool: Arc = Arc::new(GreedyMemoryPool::new(1024 * 1024 * 64)); + let mut builder = build_merge_builder( + spill_manager, + schema, + vec![f0, f1], + &pool, + initial_batch_size, + ); + + // Split the run at index 0. `swap_remove_front` moves the other run to the + // front and the shrunk run is pushed to the back, so index 0 now holds the + // run we have not split yet. + builder.split_spill_file_in_half(0).await?; + builder.split_spill_file_in_half(0).await?; + + // The global merge batch size is never lowered - the reduction is per-run. + assert_eq!( + builder.batch_size, initial_batch_size, + "splitting runs must not lower the global merge batch size" + ); + + // Both runs were split exactly once, so each is limited to half the original + // batch size. A global-halving implementation would have driven the second + // run down to a quarter. + let expected_limit = Some(initial_batch_size / 2); + for file in &builder.sorted_spill_files { + assert_eq!( + file.batch_size_limit, expected_limit, + "each run split once should be limited to half the original batch \ + size, not compounded" + ); + } + + Ok(()) + } + + /// End-to-end guard that reordering the runs (pushing shrunk runs to the back) + /// and merging at mixed batch sizes still produces the complete, fully-sorted + /// output when several runs must be re-spilled under a tight budget. + #[tokio::test] + async fn multiple_skewed_runs_merge_correctly() -> Result<()> { + let env = Arc::new(RuntimeEnv::default()); + let schema = test_schema(); + let spill_manager = build_spill_manager(&env, &schema); + + let n: i64 = 16384; + let f0 = make_sorted_spill_file(&spill_manager, &schema, (0..n).collect()); + let f1 = make_sorted_spill_file(&spill_manager, &schema, (0..n).collect()); + let f2 = make_sorted_spill_file(&spill_manager, &schema, (0..n).collect()); + let m = f0 + .max_record_batch_memory + .max(f1.max_record_batch_memory) + .max(f2.max_record_batch_memory); + + // Only ~3.5*m: not all three runs fit at once, forcing re-spills and the + // multi-pass merge that reorders runs to the back of the queue. + let pool: Arc = Arc::new(GreedyMemoryPool::new(m * 7 / 2)); + + let builder = build_merge_builder( + spill_manager, + Arc::clone(&schema), + vec![f0, f1, f2], + &pool, + 8192, + ); + let stream = builder.create_spillable_merge_stream(); + let batches: Vec = stream.try_collect().await?; + + // Every input row is present. + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!( + total_rows, + (3 * n) as usize, + "the merge must emit every input row across all three runs" + ); + + // The output is globally sorted, and every value appears exactly three times + // (once per input run), proving no rows were dropped or duplicated by the + // reordering/re-spilling. + let merged = concat_batches(&schema, &batches)?; + let col = merged.column(0).as_primitive::(); + for i in 1..col.len() { + assert!( + col.value(i - 1) <= col.value(i), + "merge output must be sorted: {} > {} at {i}", + col.value(i - 1), + col.value(i), + ); + } + for value in 0..n { + let expected_index = (value as usize) * 3; + assert_eq!( + col.value(expected_index), + value, + "each value must appear exactly three times in sorted order" + ); + } + + Ok(()) + } } diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 868ab64e9088..d0919d7fd5aa 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -456,6 +456,9 @@ impl ExternalSorter { self.finished_spill_files.push(SortedSpillFile { file: spill_file, max_record_batch_memory, + // Freshly spilled sorted runs are written at the normal batch size, + // so they impose no cap on later merge output batch sizes. + batch_size_limit: None, }); } diff --git a/datafusion/physical-plan/src/sorts/streaming_merge.rs b/datafusion/physical-plan/src/sorts/streaming_merge.rs index ade24ff0534f..4b73d047c59c 100644 --- a/datafusion/physical-plan/src/sorts/streaming_merge.rs +++ b/datafusion/physical-plan/src/sorts/streaming_merge.rs @@ -63,6 +63,15 @@ pub struct SortedSpillFile { /// how much memory the largest memory batch is taking pub max_record_batch_memory: usize, + + /// Upper bound on the merge output batch size when this run participates in a + /// merge. `None` means the run is unconstrained (written at the normal batch + /// size). `Some(n)` means the run was re-spilled with a shrunk batch layout to + /// resolve skew (see [`MultiLevelMergeBuilder`](crate::sorts::multi_level_merge)), + /// so any merge that includes it must cap its output at `n` rows, otherwise + /// re-spilling the merged result would rebuild an oversized batch and + /// reintroduce the skew. + pub batch_size_limit: Option, } impl std::fmt::Debug for SortedSpillFile { @@ -70,14 +79,16 @@ impl std::fmt::Debug for SortedSpillFile { match self.file.path() { Some(path) => write!( f, - "SortedSpillFile({:?}) takes {}", + "SortedSpillFile({:?}) takes {} (batch_size_limit: {:?})", path, - human_readable_size(self.max_record_batch_memory) + human_readable_size(self.max_record_batch_memory), + self.batch_size_limit, ), None => write!( f, - "SortedSpillFile() takes {}", - human_readable_size(self.max_record_batch_memory) + "SortedSpillFile() takes {} (batch_size_limit: {:?})", + human_readable_size(self.max_record_batch_memory), + self.batch_size_limit, ), } } From 0c2efaeae164c2e5778969b0192bc8bb3a66769a Mon Sep 17 00:00:00 2001 From: Emily Matheys Date: Thu, 2 Jul 2026 11:24:41 +0300 Subject: [PATCH 2/2] fix docs --- datafusion/physical-plan/src/sorts/streaming_merge.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/streaming_merge.rs b/datafusion/physical-plan/src/sorts/streaming_merge.rs index 4b73d047c59c..7db113cf04ea 100644 --- a/datafusion/physical-plan/src/sorts/streaming_merge.rs +++ b/datafusion/physical-plan/src/sorts/streaming_merge.rs @@ -67,8 +67,8 @@ pub struct SortedSpillFile { /// Upper bound on the merge output batch size when this run participates in a /// merge. `None` means the run is unconstrained (written at the normal batch /// size). `Some(n)` means the run was re-spilled with a shrunk batch layout to - /// resolve skew (see [`MultiLevelMergeBuilder`](crate::sorts::multi_level_merge)), - /// so any merge that includes it must cap its output at `n` rows, otherwise + /// resolve skew (see `MultiLevelMergeBuilder`), so any merge that includes it + /// must cap its output at `n` rows, otherwise /// re-spilling the merged result would rebuild an oversized batch and /// reintroduce the skew. pub batch_size_limit: Option,