diff --git a/arrow-flight/benches/flight.rs b/arrow-flight/benches/flight.rs index db03380bb005..3519b8e133d0 100644 --- a/arrow-flight/benches/flight.rs +++ b/arrow-flight/benches/flight.rs @@ -18,6 +18,7 @@ use arrow_array::RecordBatch; use arrow_flight::{ FlightClient, FlightData, + decode::FlightRecordBatchStream, encode::{DictionaryHandling, FlightDataEncoderBuilder}, }; use criterion::{BenchmarkId, Criterion, Throughput, criterion_group, criterion_main}; @@ -66,6 +67,43 @@ async fn roundtrip(channel: Channel, batch: RecordBatch) { .unwrap(); } +fn bench_decode(c: &mut Criterion) { + let rt = tokio::runtime::Runtime::new().unwrap(); + let mut g = c.benchmark_group("decode"); + + for &(name, build) in TYPES { + for &rows in &ROWS { + for &cols in &COLS { + let batch = build_batch(name, rows, cols, build); + let frames: Vec = rt + .block_on( + FlightDataEncoderBuilder::new() + .build(futures::stream::iter([Ok(batch.clone())])) + .try_collect(), + ) + .unwrap(); + let id = BenchmarkId::new(name, format!("{rows}x{cols}")); + g.throughput(Throughput::Bytes(batch.get_array_memory_size() as u64)); + g.bench_function(id, |b| { + b.to_async(&rt).iter_batched( + || frames.clone(), + |frames| async move { + let _: Vec = + FlightRecordBatchStream::new_from_flight_data( + futures::stream::iter(frames.into_iter().map(Ok)), + ) + .try_collect() + .await + .unwrap(); + }, + criterion::BatchSize::SmallInput, + ); + }); + } + } + } +} + fn bench_roundtrip(c: &mut Criterion) { let rt = tokio::runtime::Runtime::new().unwrap(); let (channel, _) = rt.block_on(start_server()); @@ -134,6 +172,7 @@ fn bench_do_put_dictionary(c: &mut Criterion) { criterion_group!( benches, bench_encode, + bench_decode, bench_roundtrip, bench_do_put_dictionary );