diff --git a/datafusion/core/tests/fuzz_cases/spilling_fuzz_in_memory_constrained_env.rs b/datafusion/core/tests/fuzz_cases/spilling_fuzz_in_memory_constrained_env.rs index a754816d5fc1a..103c3e03c06df 100644 --- a/datafusion/core/tests/fuzz_cases/spilling_fuzz_in_memory_constrained_env.rs +++ b/datafusion/core/tests/fuzz_cases/spilling_fuzz_in_memory_constrained_env.rs @@ -39,10 +39,12 @@ use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_functions_aggregate::array_agg::array_agg_udaf; use datafusion_physical_expr::aggregate::AggregateExprBuilder; use datafusion_physical_expr::expressions::{Column, col}; +use datafusion_physical_expr_common::metrics::MetricsSet; use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::aggregates::{ AggregateExec, AggregateMode, PhysicalGroupBy, }; +use datafusion_physical_plan::metrics::MetricValue; use datafusion_physical_plan::stream::RecordBatchStreamAdapter; use futures::StreamExt; @@ -69,16 +71,18 @@ async fn test_sort_with_limited_memory() -> Result<()> { // Basic test with a lot of groups that cannot all fit in memory and 1 record batch // from each spill file is too much memory - let spill_count = run_sort_test_with_limited_memory(RunTestWithLimitedMemoryArgs { + let metrics = run_sort_test_with_limited_memory(RunTestWithLimitedMemoryArgs { pool_size, task_ctx: Arc::new(task_ctx), number_of_record_batches: 100, get_size_of_record_batch_to_generate: Box::pin(move |_| record_batch_size), memory_behavior: Default::default(), + assert_all_output_batches_roughly_match_batch_size_conf: true, }) .await?; - let total_spill_files_size = spill_count * record_batch_size; + let total_spill_files_size = + metrics.spill_count().unwrap_or_default() * record_batch_size; assert!( total_spill_files_size > pool_size, "Total spill files size {total_spill_files_size} should be greater than pool size {pool_size}", @@ -119,6 +123,7 @@ async fn test_sort_with_limited_memory_and_different_sizes_of_record_batch() -> } }), memory_behavior: Default::default(), + assert_all_output_batches_roughly_match_batch_size_conf: true, }) .await?; @@ -157,6 +162,7 @@ async fn test_sort_with_limited_memory_and_different_sizes_of_record_batch_and_c } }), memory_behavior: MemoryBehavior::TakeAllMemoryAndReleaseEveryNthBatch(10), + assert_all_output_batches_roughly_match_batch_size_conf: true, }) .await?; @@ -195,6 +201,7 @@ async fn test_sort_with_limited_memory_and_different_sizes_of_record_batch_and_t } }), memory_behavior: MemoryBehavior::TakeAllMemoryAtTheBeginning, + assert_all_output_batches_roughly_match_batch_size_conf: true, }) .await?; @@ -227,6 +234,7 @@ async fn test_sort_with_limited_memory_and_large_record_batch() -> Result<()> { number_of_record_batches: 100, get_size_of_record_batch_to_generate: Box::pin(move |_| pool_size / 6), memory_behavior: Default::default(), + assert_all_output_batches_roughly_match_batch_size_conf: true, }) .await?; @@ -252,21 +260,33 @@ async fn test_sort_with_limited_memory_and_oversized_record_batch() -> Result<() )) }; + let number_of_record_batches = 100; + // Each spilled run's largest batch is so big that two merge streams cannot be // reserved at once even at the smallest read-buffer size (`2 * (2 * batch) > // pool`), yet a single stream still fits (`2 * batch < pool`). Reducing the // buffer size therefore cannot help, the multi-level merge has to re-spill a // run with a smaller batch size to make progress instead of failing with // `ResourcesExhausted`. - run_sort_test_with_limited_memory(RunTestWithLimitedMemoryArgs { + let metrics = run_sort_test_with_limited_memory(RunTestWithLimitedMemoryArgs { pool_size, task_ctx: Arc::new(task_ctx), - number_of_record_batches: 100, + number_of_record_batches, get_size_of_record_batch_to_generate: Box::pin(move |_| pool_size / 3), memory_behavior: Default::default(), + + assert_all_output_batches_roughly_match_batch_size_conf: false, }) .await?; + let output_batches = get_output_batches_from_metrics(&metrics); + + // minimum 2 batches more + assert!( + output_batches >= number_of_record_batches + 2, + "output_batches {output_batches} should be greater than number_of_record_batches ({number_of_record_batches}) + 2" + ); + Ok(()) } @@ -277,6 +297,9 @@ struct RunTestWithLimitedMemoryArgs { get_size_of_record_batch_to_generate: Pin usize + Send + 'static>>, memory_behavior: MemoryBehavior, + + /// When true we would `assert_eq(the number of output_rows metric / output_batches metric == task_ctx.batch_size)` + assert_all_output_batches_roughly_match_batch_size_conf: bool, } #[derive(Default)] @@ -289,7 +312,7 @@ enum MemoryBehavior { async fn run_sort_test_with_limited_memory( mut args: RunTestWithLimitedMemoryArgs, -) -> Result { +) -> Result { let get_size_of_record_batch_to_generate = std::mem::replace( &mut args.get_size_of_record_batch_to_generate, Box::pin(move |_| unreachable!("should not be called after take")), @@ -349,7 +372,23 @@ async fn run_sort_test_with_limited_memory( let result = sort_exec.execute(0, Arc::clone(&args.task_ctx))?; - run_test(args, sort_exec, result).await + let number_of_record_batches = args.number_of_record_batches; + let assert_output_batch_size = + args.assert_all_output_batches_roughly_match_batch_size_conf; + + let metrics = run_test(args, sort_exec, result).await?; + + assert_baseline_metrics_for_non_empty_output( + &metrics, + number_of_record_batches * record_batch_size as usize, + if assert_output_batch_size { + Some(record_batch_size as usize) + } else { + None + }, + ); + + Ok(metrics) } fn grow_memory_as_much_as_possible( @@ -383,17 +422,19 @@ async fn test_aggregate_with_high_cardinality_with_limited_memory() -> Result<() // Basic test with a lot of groups that cannot all fit in memory and 1 record batch // from each spill file is too much memory - let spill_count = + let metrics = run_test_aggregate_with_high_cardinality(RunTestWithLimitedMemoryArgs { pool_size, task_ctx: Arc::new(task_ctx), number_of_record_batches: 100, get_size_of_record_batch_to_generate: Box::pin(move |_| record_batch_size), memory_behavior: Default::default(), + assert_all_output_batches_roughly_match_batch_size_conf: true, }) .await?; - let total_spill_files_size = spill_count * record_batch_size; + let total_spill_files_size = + metrics.spill_count().unwrap_or_default() * record_batch_size; assert!( total_spill_files_size > pool_size, "Total spill files size {total_spill_files_size} should be greater than pool size {pool_size}", @@ -430,6 +471,7 @@ async fn test_aggregate_with_high_cardinality_with_limited_memory_and_different_ } }), memory_behavior: Default::default(), + assert_all_output_batches_roughly_match_batch_size_conf: true, }) .await?; @@ -464,6 +506,7 @@ async fn test_aggregate_with_high_cardinality_with_limited_memory_and_different_ } }), memory_behavior: MemoryBehavior::TakeAllMemoryAndReleaseEveryNthBatch(10), + assert_all_output_batches_roughly_match_batch_size_conf: true, }) .await?; @@ -498,6 +541,7 @@ async fn test_aggregate_with_high_cardinality_with_limited_memory_and_different_ } }), memory_behavior: MemoryBehavior::TakeAllMemoryAtTheBeginning, + assert_all_output_batches_roughly_match_batch_size_conf: true, }) .await?; @@ -527,6 +571,7 @@ async fn test_aggregate_with_high_cardinality_with_limited_memory_and_large_reco number_of_record_batches: 100, get_size_of_record_batch_to_generate: Box::pin(move |_| pool_size / 6), memory_behavior: Default::default(), + assert_all_output_batches_roughly_match_batch_size_conf: true, }) .await?; @@ -535,7 +580,7 @@ async fn test_aggregate_with_high_cardinality_with_limited_memory_and_large_reco async fn run_test_aggregate_with_high_cardinality( mut args: RunTestWithLimitedMemoryArgs, -) -> Result { +) -> Result { let get_size_of_record_batch_to_generate = std::mem::replace( &mut args.get_size_of_record_batch_to_generate, Box::pin(move |_| unreachable!("should not be called after take")), @@ -624,12 +669,13 @@ async fn run_test( args: RunTestWithLimitedMemoryArgs, plan: Arc, result_stream: SendableRecordBatchStream, -) -> Result { +) -> Result { let number_of_record_batches = args.number_of_record_batches; consume_stream_and_simulate_other_running_memory_consumers(args, result_stream) .await?; + let metrics = plan.metrics().expect("must have metrics"); let spill_count = assert_spill_count_metric(true, plan); assert!( @@ -637,7 +683,7 @@ async fn run_test( "Expected spill, but did not, number of record batches: {number_of_record_batches}", ); - Ok(spill_count) + Ok(metrics) } /// Consume the stream and change the amount of memory used while consuming it based on the [`MemoryBehavior`] provided @@ -693,3 +739,56 @@ async fn consume_stream_and_simulate_other_running_memory_consumers( Ok(()) } + +/// Assert baseline metrics are as expected or around that +/// +/// `output_batch_size` should be `None` when you expect to not get batched at the same size +/// `Some(session conf batch size)` for the rest +fn assert_baseline_metrics_for_non_empty_output( + metrics: &MetricsSet, + expected_output_rows: usize, + output_batch_size: Option, +) { + let end_time = metrics + .iter() + .find_map(|item| match item.value() { + MetricValue::EndTimestamp(end) => Some(end), + _ => None, + }) + .expect("Must have end time metric since it exists in the baseline"); + + assert_ne!(end_time.value(), None); + + assert_eq!(metrics.output_rows(), Some(expected_output_rows)); + + let output_bytes = metrics + .iter() + .find_map(|item| match item.value() { + MetricValue::OutputBytes(total) => Some(total), + _ => None, + }) + .expect("Must have output_bytes metric since it exists in the baseline"); + + assert_ne!(output_bytes.value(), 0_usize); + + let output_batches = get_output_batches_from_metrics(metrics); + + if let Some(output_batch_size) = output_batch_size { + assert_eq!( + output_batches, + expected_output_rows.div_ceil(output_batch_size) + ); + } else { + assert_ne!(output_batches, 0,); + } +} + +fn get_output_batches_from_metrics(metrics: &MetricsSet) -> usize { + metrics + .iter() + .find_map(|item| match item.value() { + MetricValue::OutputBatches(total) => Some(total.value()), + _ => None, + }) + .expect("Must have output_batches metric since it exists in the baseline") +} diff --git a/datafusion/physical-plan/src/sorts/multi_level_merge.rs b/datafusion/physical-plan/src/sorts/multi_level_merge.rs index e52a6edb82fd4..8e292900b1d30 100644 --- a/datafusion/physical-plan/src/sorts/multi_level_merge.rs +++ b/datafusion/physical-plan/src/sorts/multi_level_merge.rs @@ -33,7 +33,7 @@ use datafusion_execution::memory_pool::MemoryReservation; use crate::sorts::builder::try_grow_reservation_to_at_least; use crate::sorts::sort::get_reserved_bytes_for_record_batch_size; use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder}; -use crate::stream::RecordBatchStreamAdapter; +use crate::stream::{ObservedStream, RecordBatchStreamAdapter}; use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream}; use datafusion_physical_expr_common::sort_expr::LexOrdering; use futures::TryStreamExt; @@ -242,27 +242,34 @@ impl MultiLevelMergeBuilder { fn merge_sorted_runs_within_mem_limit(&mut self) -> Result { match (self.sorted_spill_files.len(), self.sorted_streams.len()) { // No data so empty batch - (0, 0) => Ok(MergeStep::Stream(Box::pin(EmptyRecordBatchStream::new( - Arc::clone(&self.schema), - )))), + (0, 0) => { + let empty_stream = + Box::pin(EmptyRecordBatchStream::new(Arc::clone(&self.schema))); + Ok(MergeStep::Stream(self.observe_output(empty_stream))) + } // Only in-memory stream, return that - (0, 1) => Ok(MergeStep::Stream(self.sorted_streams.remove(0))), + (0, 1) => { + let output_stream = self.sorted_streams.remove(0); + Ok(MergeStep::Stream(self.observe_output(output_stream))) + } // Only single sorted spill file so return it (1, 0) => { let spill_file = self.sorted_spill_files.remove(0); // Not reserving any memory for this disk as we are not holding it in memory - Ok(MergeStep::Stream( - self.spill_manager - .read_spill_as_stream(spill_file.file, None)?, - )) + let output_stream = self + .spill_manager + .read_spill_as_stream(spill_file.file, None)?; + + Ok(MergeStep::Stream(self.observe_output(output_stream))) } // Only in memory streams, so merge them all in a single pass (0, _) => { let sorted_stream = mem::take(&mut self.sorted_streams); + // No need to wrap with observed stream since merge sort will update the observed metrics Ok(MergeStep::Stream(self.create_new_merge_sort( sorted_stream, // If we have no sorted spill files left, this is the last run @@ -574,6 +581,13 @@ impl MultiLevelMergeBuilder { Ok(()) } + + fn observe_output( + &self, + stream: SendableRecordBatchStream, + ) -> SendableRecordBatchStream { + Box::pin(ObservedStream::new(stream, self.metrics.clone(), None)) + } } /// Outcome of trying to reserve memory for one multi-level merge pass. diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 868ab64e90885..792c432155a8b 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -46,8 +46,8 @@ use crate::spill::get_record_batch_memory_size; use crate::spill::in_progress_spill_file::InProgressSpillFile; use crate::spill::spill_manager::{GetSlicedSize, SpillManager}; use crate::statistics::StatisticsArgs; -use crate::stream::RecordBatchStreamAdapter; use crate::stream::ReservationStream; +use crate::stream::{ObservedStream, RecordBatchStreamAdapter}; use crate::topk::TopK; use crate::topk::TopKDynamicFilters; use crate::{ @@ -374,7 +374,7 @@ impl ExternalSorter { // allocation. Only needed for the non-spill path; the spill // path transfers the reservation to the merge stream instead. self.merge_reservation.free(); - self.in_mem_sort_stream(self.metrics.baseline.clone(), true) + self.in_mem_sort_stream(true, true) } } @@ -476,9 +476,11 @@ impl ExternalSorter { // reserved again for the next spill. self.merge_reservation.free(); - // No coalescing on the spill path: it raises per-run peak memory. - let mut sorted_stream = - self.in_mem_sort_stream(self.metrics.baseline.intermediate(), false)?; + let mut sorted_stream = self.in_mem_sort_stream( + false, + // No coalescing on the spill path: it raises per-run peak memory. + false, + )?; // After `in_mem_sort_stream()` is constructed, all `in_mem_batches` is taken // to construct a globally sorted stream. assert_or_internal_err!( @@ -589,18 +591,18 @@ impl ExternalSorter { /// reduce merge fan-in. Disabled on the spill path to keep peak memory low. fn in_mem_sort_stream( &mut self, - metrics: BaselineMetrics, + is_output_stream: bool, coalesce_runs: bool, ) -> Result { if self.in_mem_batches.is_empty() { - return Ok(Box::pin(EmptyRecordBatchStream::new(Arc::clone( - &self.schema, - )))); + let empty_stream = + Box::pin(EmptyRecordBatchStream::new(Arc::clone(&self.schema))); + return Ok(self.observe_if_output(empty_stream, is_output_stream)); } // The elapsed compute timer is updated when the value is dropped. // There is no need for an explicit call to drop. - let elapsed_compute = metrics.elapsed_compute().clone(); + let elapsed_compute = self.metrics.baseline.elapsed_compute().clone(); let _timer = elapsed_compute.timer(); // Please pay attention that any operation inside of `in_mem_sort_stream` will @@ -612,7 +614,8 @@ impl ExternalSorter { if self.in_mem_batches.len() == 1 { let batch = self.in_mem_batches.swap_remove(0); let reservation = self.reservation.take(); - return self.sort_batch_stream(batch, &metrics, reservation); + let sorted_stream = self.sort_batch_stream(batch, reservation)?; + return Ok(self.observe_if_output(sorted_stream, is_output_stream)); } // If less than sort_in_place_threshold_bytes, concatenate and sort in place @@ -624,7 +627,8 @@ impl ExternalSorter { .try_resize(get_reserved_bytes_for_record_batch(&batch)?) .map_err(Self::err_with_oom_context)?; let reservation = self.reservation.take(); - return self.sort_batch_stream(batch, &metrics, reservation); + let sorted_stream = self.sort_batch_stream(batch, reservation)?; + return Ok(self.observe_if_output(sorted_stream, is_output_stream)); } // For single-column sorts, coalesce the buffered batches into fewer, @@ -642,11 +646,10 @@ impl ExternalSorter { let streams = runs .into_iter() .map(|batch| { - let metrics = self.metrics.baseline.intermediate(); let reservation = self .reservation .split(get_reserved_bytes_for_record_batch(&batch)?); - let input = self.sort_batch_stream(batch, &metrics, reservation)?; + let input = self.sort_batch_stream(batch, reservation)?; Ok(spawn_buffered(input, 1)) }) .collect::>()?; @@ -655,7 +658,11 @@ impl ExternalSorter { .with_streams(streams) .with_schema(Arc::clone(&self.schema)) .with_expressions(&self.expr.clone()) - .with_metrics(metrics) + .with_metrics(if is_output_stream { + self.metrics.baseline.clone() + } else { + self.metrics.baseline.intermediate() + }) .with_batch_size(self.batch_size) .with_fetch(None) .with_reservation(self.merge_reservation.new_empty()) @@ -726,7 +733,6 @@ impl ExternalSorter { fn sort_batch_stream( &self, batch: RecordBatch, - metrics: &BaselineMetrics, reservation: MemoryReservation, ) -> Result { assert_eq!( @@ -737,7 +743,6 @@ impl ExternalSorter { let schema = batch.schema(); let expressions = self.expr.clone(); let batch_size = self.batch_size; - let output_row_metrics = metrics.output_rows().clone(); let stream = futures::stream::once(async move { let schema = batch.schema(); @@ -767,14 +772,7 @@ impl ExternalSorter { reservation, )) as SendableRecordBatchStream) }) - .try_flatten() - .map(move |batch| match batch { - Ok(batch) => { - output_row_metrics.add(batch.num_rows()); - Ok(batch) - } - Err(e) => Err(e), - }); + .try_flatten(); Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream))) } @@ -833,6 +831,22 @@ impl ExternalSorter { _ => e, } } + + fn observe_if_output( + &self, + mut stream: SendableRecordBatchStream, + wrap: bool, + ) -> SendableRecordBatchStream { + if wrap { + stream = Box::pin(ObservedStream::new( + stream, + self.metrics.baseline.clone(), + None, + )) + } + + stream + } } /// Estimate how much memory is needed to sort a `RecordBatch`. @@ -1564,6 +1578,7 @@ mod tests { use datafusion_physical_expr::expressions::{Column, Literal}; use datafusion_physical_expr::{DynamicFilterTracking, EquivalenceProperties}; + use datafusion_physical_expr_common::metrics::MetricValue; use futures::{FutureExt, Stream, TryStreamExt}; use insta::assert_snapshot; @@ -2506,7 +2521,8 @@ mod tests { } #[tokio::test] - async fn should_return_stream_with_batches_in_the_requested_size() -> Result<()> { + async fn should_return_stream_with_batches_in_the_requested_size_and_update_metrics() + -> Result<()> { let batch_size = 100; let create_task_ctx = |_: &[RecordBatch]| { @@ -2518,19 +2534,22 @@ mod tests { }; // Smaller than batch size and require more than a single batch to get the requested batch size - test_sort_output_batch_size(10, batch_size / 4, create_task_ctx).await?; + test_sort_output_batch_size_and_base_metrics(10, batch_size / 4, create_task_ctx) + .await?; // Not evenly divisible by batch size - test_sort_output_batch_size(10, batch_size + 7, create_task_ctx).await?; + test_sort_output_batch_size_and_base_metrics(10, batch_size + 7, create_task_ctx) + .await?; // Evenly divisible by batch size and is larger than 2 output batches - test_sort_output_batch_size(10, batch_size * 3, create_task_ctx).await?; + test_sort_output_batch_size_and_base_metrics(10, batch_size * 3, create_task_ctx) + .await?; Ok(()) } #[tokio::test] - async fn should_return_stream_with_batches_in_the_requested_size_when_sorting_in_place() + async fn should_return_stream_with_batches_in_the_requested_size_and_update_metrics_when_sorting_in_place() -> Result<()> { let batch_size = 100; @@ -2544,8 +2563,12 @@ mod tests { // Smaller than batch size and require more than a single batch to get the requested batch size { - let metrics = - test_sort_output_batch_size(10, batch_size / 4, create_task_ctx).await?; + let metrics = test_sort_output_batch_size_and_base_metrics( + 10, + batch_size / 4, + create_task_ctx, + ) + .await?; assert_eq!( metrics.spill_count(), @@ -2556,8 +2579,12 @@ mod tests { // Not evenly divisible by batch size { - let metrics = - test_sort_output_batch_size(10, batch_size + 7, create_task_ctx).await?; + let metrics = test_sort_output_batch_size_and_base_metrics( + 10, + batch_size + 7, + create_task_ctx, + ) + .await?; assert_eq!( metrics.spill_count(), @@ -2568,8 +2595,12 @@ mod tests { // Evenly divisible by batch size and is larger than 2 output batches { - let metrics = - test_sort_output_batch_size(10, batch_size * 3, create_task_ctx).await?; + let metrics = test_sort_output_batch_size_and_base_metrics( + 10, + batch_size * 3, + create_task_ctx, + ) + .await?; assert_eq!( metrics.spill_count(), @@ -2582,7 +2613,7 @@ mod tests { } #[tokio::test] - async fn should_return_stream_with_batches_in_the_requested_size_when_having_a_single_batch() + async fn should_return_stream_with_batches_in_the_requested_size_and_update_metrics_when_having_a_single_batch() -> Result<()> { let batch_size = 100; @@ -2593,7 +2624,7 @@ mod tests { // Smaller than batch size and require more than a single batch to get the requested batch size { - let metrics = test_sort_output_batch_size( + let metrics = test_sort_output_batch_size_and_base_metrics( // Single batch 1, batch_size / 4, @@ -2610,7 +2641,7 @@ mod tests { // Not evenly divisible by batch size { - let metrics = test_sort_output_batch_size( + let metrics = test_sort_output_batch_size_and_base_metrics( // Single batch 1, batch_size + 7, @@ -2627,7 +2658,7 @@ mod tests { // Evenly divisible by batch size and is larger than 2 output batches { - let metrics = test_sort_output_batch_size( + let metrics = test_sort_output_batch_size_and_base_metrics( // Single batch 1, batch_size * 3, @@ -2646,7 +2677,7 @@ mod tests { } #[tokio::test] - async fn should_return_stream_with_batches_in_the_requested_size_when_having_to_spill() + async fn should_return_stream_with_batches_in_the_requested_size_and_update_metrics_when_having_to_spill() -> Result<()> { let batch_size = 100; @@ -2674,24 +2705,36 @@ mod tests { // Smaller than batch size and require more than a single batch to get the requested batch size { - let metrics = - test_sort_output_batch_size(10, batch_size / 4, create_task_ctx).await?; + let metrics = test_sort_output_batch_size_and_base_metrics( + 10, + batch_size / 4, + create_task_ctx, + ) + .await?; assert_ne!(metrics.spill_count().unwrap(), 0, "expected to spill"); } // Not evenly divisible by batch size { - let metrics = - test_sort_output_batch_size(10, batch_size + 7, create_task_ctx).await?; + let metrics = test_sort_output_batch_size_and_base_metrics( + 10, + batch_size + 7, + create_task_ctx, + ) + .await?; assert_ne!(metrics.spill_count().unwrap(), 0, "expected to spill"); } // Evenly divisible by batch size and is larger than 2 batches { - let metrics = - test_sort_output_batch_size(10, batch_size * 3, create_task_ctx).await?; + let metrics = test_sort_output_batch_size_and_base_metrics( + 10, + batch_size * 3, + create_task_ctx, + ) + .await?; assert_ne!(metrics.spill_count().unwrap(), 0, "expected to spill"); } @@ -2699,7 +2742,7 @@ mod tests { Ok(()) } - async fn test_sort_output_batch_size( + async fn test_sort_output_batch_size_and_base_metrics( number_of_batches: usize, batch_size_to_generate: usize, create_task_ctx: impl Fn(&[RecordBatch]) -> TaskContext, @@ -2709,10 +2752,13 @@ mod tests { .collect::>(); let task_ctx = create_task_ctx(batches.as_slice()); + let output_rows = batches.iter().map(|item| item.num_rows()).sum(); + let expected_batch_size = task_ctx.session_config().batch_size(); + let schema = batches[0].schema(); let (mut output_batches, metrics) = - run_sort_on_input(task_ctx, "i", batches).await?; + run_sort_on_input(task_ctx, "i", batches, schema).await?; let last_batch = output_batches.pop().unwrap(); @@ -2727,18 +2773,87 @@ mod tests { } assert_eq!(last_batch.num_rows(), last_expected_batch_size); + assert_baseline_metrics_for_non_empty_output( + &metrics, + output_rows, + expected_batch_size, + ); + Ok(metrics) } + #[tokio::test] + async fn empty_sort_stream_should_report_end_time() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)])); + let task_ctx = TaskContext::default(); + + let (_, metrics) = run_sort_on_input(task_ctx, "i", vec![], schema).await?; + + let end_time = metrics + .iter() + .find_map(|item| match item.value() { + MetricValue::EndTimestamp(end) => Some(end), + _ => None, + }) + .expect("Must have end time metric since it exists in the baseline"); + + assert_eq!( + metrics.spill_count().unwrap_or_default(), + 0, + "expected to not have spills" + ); + assert_ne!(end_time.value(), None); + + Ok(()) + } + + fn assert_baseline_metrics_for_non_empty_output( + metrics: &MetricsSet, + output_rows: usize, + batch_size: usize, + ) { + let end_time = metrics + .iter() + .find_map(|item| match item.value() { + MetricValue::EndTimestamp(end) => Some(end), + _ => None, + }) + .expect("Must have end time metric since it exists in the baseline"); + + assert_ne!(end_time.value(), None); + + assert_eq!(metrics.output_rows(), Some(output_rows)); + + let output_bytes = metrics + .iter() + .find_map(|item| match item.value() { + MetricValue::OutputBytes(total) => Some(total), + _ => None, + }) + .expect("Must have output_bytes metric since it exists in the baseline"); + + assert_ne!(output_bytes.value(), 0_usize); + + let output_batches = metrics + .iter() + .find_map(|item| match item.value() { + MetricValue::OutputBatches(total) => Some(total), + _ => None, + }) + .expect("Must have output_batches metric since it exists in the baseline"); + + assert_eq!(output_batches.value(), output_rows.div_ceil(batch_size)); + } + async fn run_sort_on_input( task_ctx: TaskContext, order_by_col: &str, batches: Vec, + schema: SchemaRef, ) -> Result<(Vec, MetricsSet)> { let task_ctx = Arc::new(task_ctx); // let task_ctx = env. - let schema = batches[0].schema(); let ordering: LexOrdering = [PhysicalSortExpr { expr: col(order_by_col, &schema)?, options: SortOptions { @@ -2749,7 +2864,11 @@ mod tests { .into(); let sort_exec: Arc = Arc::new(SortExec::new( ordering.clone(), - TestMemoryExec::try_new_exec(std::slice::from_ref(&batches), schema, None)?, + TestMemoryExec::try_new_exec( + std::slice::from_ref(&batches), + Arc::clone(&schema), + None, + )?, )); let sorted_batches = @@ -2759,11 +2878,10 @@ mod tests { // assert output { - let input_batches_concat = concat_batches(batches[0].schema_ref(), &batches)?; + let input_batches_concat = concat_batches(&schema, &batches)?; let sorted_input_batch = sort_batch(&input_batches_concat, &ordering, None)?; - let sorted_batches_concat = - concat_batches(sorted_batches[0].schema_ref(), &sorted_batches)?; + let sorted_batches_concat = concat_batches(&schema, &sorted_batches)?; assert_eq!(sorted_input_batch, sorted_batches_concat); } diff --git a/datafusion/sqllogictest/test_files/explain_analyze.slt b/datafusion/sqllogictest/test_files/explain_analyze.slt index 665c8d7f440fb..ae7ca1ababa86 100644 --- a/datafusion/sqllogictest/test_files/explain_analyze.slt +++ b/datafusion/sqllogictest/test_files/explain_analyze.slt @@ -423,7 +423,7 @@ JOIN t2 ON t1.k > t2.k; ---- Plan with Metrics 01)PiecewiseMergeJoin: operator=Gt, join_type=Inner, on=(k > k), metrics=[output_bytes=0.0 B, build_mem_used=144.0 B] -02)--SortExec: expr=[k@0 ASC], preserve_partitioning=[false], metrics=[output_bytes=0.0 B, spilled_bytes=0.0 B] +02)--SortExec: expr=[k@0 ASC], preserve_partitioning=[false], metrics=[output_bytes=16.0 B, spilled_bytes=0.0 B] 03)----ProjectionExec: expr=[column1@0 as k], metrics=[output_bytes=32.0 B] 04)------DataSourceExec: partitions=1, partition_sizes=[1], metrics=[] 05)--ProjectionExec: expr=[column1@0 as k], metrics=[output_bytes=32.0 B] @@ -456,9 +456,9 @@ JOIN ea_smj_t2 ON ea_smj_t1.a = ea_smj_t2.a ---- Plan with Metrics 01)SortMergeJoinExec: join_type=Inner, on=[(a@0, a@0)], filter=CAST(b@1 AS Int64) * 50 <= CAST(b@0 AS Int64), metrics=[output_bytes=320.0 KB, spilled_bytes=0.0 B, peak_mem_used=432.0 B] -02)--SortExec: expr=[a@0 ASC], preserve_partitioning=[false], metrics=[output_bytes=0.0 B, spilled_bytes=0.0 B] +02)--SortExec: expr=[a@0 ASC], preserve_partitioning=[false], metrics=[output_bytes=60.0 B, spilled_bytes=0.0 B] 03)----DataSourceExec: partitions=1, partition_sizes=[1], metrics=[] -04)--SortExec: expr=[a@0 ASC], preserve_partitioning=[false], metrics=[output_bytes=0.0 B, spilled_bytes=0.0 B] +04)--SortExec: expr=[a@0 ASC], preserve_partitioning=[false], metrics=[output_bytes=40.0 B, spilled_bytes=0.0 B] 05)----DataSourceExec: partitions=1, partition_sizes=[1], metrics=[] query TT @@ -469,9 +469,9 @@ LEFT SEMI JOIN ea_smj_t2 ON ea_smj_t1.a = ea_smj_t2.a; ---- Plan with Metrics 01)SortMergeJoinExec: join_type=LeftSemi, on=[(a@0, a@0)], metrics=[output_bytes=160.0 KB, spilled_bytes=0.0 B, peak_mem_used=0.0 B] -02)--SortExec: expr=[a@0 ASC], preserve_partitioning=[false], metrics=[output_bytes=0.0 B, spilled_bytes=0.0 B] +02)--SortExec: expr=[a@0 ASC], preserve_partitioning=[false], metrics=[output_bytes=60.0 B, spilled_bytes=0.0 B] 03)----DataSourceExec: partitions=1, partition_sizes=[1], metrics=[] -04)--SortExec: expr=[a@0 ASC], preserve_partitioning=[false], metrics=[output_bytes=0.0 B, spilled_bytes=0.0 B] +04)--SortExec: expr=[a@0 ASC], preserve_partitioning=[false], metrics=[output_bytes=32.0 B, spilled_bytes=0.0 B] 05)----DataSourceExec: partitions=1, partition_sizes=[1], metrics=[] statement ok