diff --git a/arrow-flight/benches/flight.rs b/arrow-flight/benches/flight.rs index 3519b8e133d0..1c6e50dad60d 100644 --- a/arrow-flight/benches/flight.rs +++ b/arrow-flight/benches/flight.rs @@ -30,6 +30,7 @@ use common::{DICT_TYPES, TYPES, build_batch, start_server}; const ROWS: [usize; 2] = [8 * 1024, 64 * 1024]; const COLS: [usize; 3] = [1, 4, 8]; +const BATCHES: usize = 4; fn bench_encode(c: &mut Criterion) { let rt = tokio::runtime::Runtime::new().unwrap(); @@ -74,16 +75,22 @@ fn bench_decode(c: &mut Criterion) { for &(name, build) in TYPES { for &rows in &ROWS { for &cols in &COLS { - let batch = build_batch(name, rows, cols, build); + let batches: Vec = (0..BATCHES) + .map(|_| build_batch(name, rows, cols, build)) + .collect(); + let total_bytes: u64 = batches + .iter() + .map(|b| b.get_array_memory_size() as u64) + .sum(); let frames: Vec = rt .block_on( FlightDataEncoderBuilder::new() - .build(futures::stream::iter([Ok(batch.clone())])) + .build(futures::stream::iter(batches.into_iter().map(Ok))) .try_collect(), ) .unwrap(); let id = BenchmarkId::new(name, format!("{rows}x{cols}")); - g.throughput(Throughput::Bytes(batch.get_array_memory_size() as u64)); + g.throughput(Throughput::Bytes(total_bytes)); g.bench_function(id, |b| { b.to_async(&rt).iter_batched( || frames.clone(), @@ -124,6 +131,52 @@ fn bench_roundtrip(c: &mut Criterion) { } } +/// Decode a multi-batch stream covering both plain and dictionary types. +/// Uses [`DictionaryHandling::Resend`] to exercise the replacement-dictionary path. +fn bench_decode_dictionary_stream(c: &mut Criterion) { + let rt = tokio::runtime::Runtime::new().unwrap(); + let mut g = c.benchmark_group("decode_stream"); + + for &(name, build) in TYPES.iter().chain(DICT_TYPES) { + for &rows in &ROWS { + for &cols in &COLS { + let batches: Vec = (0..BATCHES) + .map(|_| build_batch(name, rows, cols, build)) + .collect(); + let total_bytes: u64 = batches + .iter() + .map(|b| b.get_array_memory_size() as u64) + .sum(); + let frames: Vec = rt + .block_on( + FlightDataEncoderBuilder::new() + .with_dictionary_handling(DictionaryHandling::Resend) + .build(futures::stream::iter(batches.into_iter().map(Ok))) + .try_collect(), + ) + .unwrap(); + let id = BenchmarkId::new(name, format!("{rows}x{cols}x{BATCHES}")); + g.throughput(Throughput::Bytes(total_bytes)); + g.bench_function(id, |b| { + b.to_async(&rt).iter_batched( + || frames.clone(), + |frames| async move { + let _: Vec = + FlightRecordBatchStream::new_from_flight_data( + futures::stream::iter(frames.into_iter().map(Ok)), + ) + .try_collect() + .await + .unwrap(); + }, + criterion::BatchSize::SmallInput, + ); + }); + } + } + } +} + fn bench_do_put_dictionary(c: &mut Criterion) { let rt = tokio::runtime::Runtime::new().unwrap(); let (channel, _) = rt.block_on(start_server()); @@ -173,6 +226,7 @@ criterion_group!( benches, bench_encode, bench_decode, + bench_decode_dictionary_stream, bench_roundtrip, bench_do_put_dictionary );