Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 13 additions & 9 deletions arrow-flight/src/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::{collections::VecDeque, fmt::Debug, pin::Pin, sync::Arc, task::Poll};
use crate::{FlightData, FlightDescriptor, SchemaAsIpc, error::Result};

use arrow_array::{Array, ArrayRef, RecordBatch, RecordBatchOptions, UnionArray};
use arrow_ipc::writer::{CompressionContext, DictionaryTracker, IpcDataGenerator, IpcWriteOptions};
use arrow_ipc::writer::{DictionaryTracker, IpcDataGenerator, IpcWriteContext, IpcWriteOptions};

use arrow_schema::{DataType, Field, FieldRef, Fields, Schema, SchemaRef, UnionMode};
use bytes::Bytes;
Expand Down Expand Up @@ -372,15 +372,15 @@ impl FlightDataEncoder {
DictionaryHandling::Resend => batch,
DictionaryHandling::Hydrate => hydrate_dictionaries(&batch, schema)?,
};

for batch in split_batch_for_grpc_response(batch, self.max_flight_data_size) {
let (flight_dictionaries, flight_batch) = self.encoder.encode_batch(&batch)?;
let (flight_dictionaries, flight_batch) = self
.encoder
.encode_batch(&batch, self.max_flight_data_size)?;
for dict in flight_dictionaries {
self.queue_message(dict);
}
self.queue_message(flight_batch);
}

Ok(())
}
}
Expand Down Expand Up @@ -701,7 +701,7 @@ struct FlightIpcEncoder {
options: IpcWriteOptions,
data_gen: IpcDataGenerator,
dictionary_tracker: DictionaryTracker,
compression_context: CompressionContext,
compression_context: IpcWriteContext,
Comment thread
Rich-T-kid marked this conversation as resolved.
}

impl FlightIpcEncoder {
Expand All @@ -710,7 +710,7 @@ impl FlightIpcEncoder {
options,
data_gen: IpcDataGenerator::default(),
dictionary_tracker: DictionaryTracker::new(error_on_replacement),
compression_context: CompressionContext::default(),
compression_context: IpcWriteContext::default(),
}
}

Expand All @@ -724,7 +724,11 @@ impl FlightIpcEncoder {
fn encode_batch(
&mut self,
batch: &RecordBatch,
) -> Result<(impl Iterator<Item = FlightData> + use<>, FlightData)> {
max_flight_data_size: usize,
) -> Result<(impl Iterator<Item = FlightData> + 'static, FlightData)> {
self.compression_context
.scratch
.reserve(max_flight_data_size);
let (encoded_dictionaries, encoded_batch) = self.data_gen.encode(
batch,
&mut self.dictionary_tracker,
Expand All @@ -733,7 +737,7 @@ impl FlightIpcEncoder {
)?;

let flight_dictionaries = encoded_dictionaries.into_iter().map(|e| e.into());
let flight_batch = encoded_batch.into();
let flight_batch: FlightData = encoded_batch.into();

Ok((flight_dictionaries, flight_batch))
}
Expand Down Expand Up @@ -1833,7 +1837,7 @@ mod tests {
) -> (Vec<FlightData>, FlightData) {
let data_gen = IpcDataGenerator::default();
let mut dictionary_tracker = DictionaryTracker::new(false);
let mut compression_context = CompressionContext::default();
let mut compression_context = IpcWriteContext::default();

let (encoded_dictionaries, encoded_batch) = data_gen
.encode(
Expand Down
4 changes: 2 additions & 2 deletions arrow-flight/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::sync::Arc;
use arrow_array::{ArrayRef, RecordBatch};
use arrow_buffer::Buffer;
use arrow_ipc::convert::fb_to_schema;
use arrow_ipc::writer::CompressionContext;
use arrow_ipc::writer::IpcWriteContext;
use arrow_ipc::{reader, root_as_message, writer, writer::IpcWriteOptions};
use arrow_schema::{ArrowError, Schema, SchemaRef};

Expand Down Expand Up @@ -92,7 +92,7 @@ pub fn batches_to_flight_data(

let data_gen = writer::IpcDataGenerator::default();
let mut dictionary_tracker = writer::DictionaryTracker::new(false);
let mut compression_context = CompressionContext::default();
let mut compression_context = IpcWriteContext::default();

for batch in batches.iter() {
let (encoded_dictionaries, encoded_batch) = data_gen.encode(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use arrow::{
datatypes::SchemaRef,
ipc::{
self, reader,
writer::{self, CompressionContext},
writer::{self, IpcWriteContext},
},
record_batch::RecordBatch,
};
Expand Down Expand Up @@ -95,7 +95,7 @@ async fn upload_data(

let mut original_data_iter = original_data.iter().enumerate();

let mut compression_context = CompressionContext::default();
let mut compression_context = IpcWriteContext::default();

if let Some((counter, first_batch)) = original_data_iter.next() {
let metadata = counter.to_string().into_bytes();
Expand Down Expand Up @@ -159,7 +159,7 @@ async fn send_batch(
batch: &RecordBatch,
options: &writer::IpcWriteOptions,
dictionary_tracker: &mut writer::DictionaryTracker,
compression_context: &mut CompressionContext,
compression_context: &mut IpcWriteContext,
) -> Result {
let data_gen = writer::IpcDataGenerator::default();

Expand Down
28 changes: 15 additions & 13 deletions arrow-ipc/src/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,24 @@
use crate::CompressionType;
use arrow_buffer::Buffer;
use arrow_schema::ArrowError;
use flatbuffers::FlatBufferBuilder;

const LENGTH_NO_COMPRESSED_DATA: i64 = -1;
const LENGTH_OF_PREFIX_DATA: i64 = 8;

/// Additional context that may be needed for compression.
///
/// In the case of zstd, this will contain the zstd context, which can be reused between subsequent
/// compression calls to avoid the performance overhead of initialising a new context for every
/// compression.
/// - The flatbuffer builder (`fbb`) is reset and reused across calls.
/// - The zstd compressor (when enabled) is kept alive to avoid re-initialisation overhead.
#[derive(Default)]
pub struct CompressionContext {
pub struct IpcWriteContext {
Comment thread
Rich-T-kid marked this conversation as resolved.
#[cfg(feature = "zstd")]
compressor: Option<zstd::bulk::Compressor<'static>>,
pub(crate) fbb: FlatBufferBuilder<'static>,
Comment thread
Rich-T-kid marked this conversation as resolved.
/// Scratch buffer for the IPC arrow data body. When set by the caller before
/// encode(), the existing allocation is reused instead of creating a fresh Vec.
pub scratch: Vec<u8>,
}

impl CompressionContext {
impl IpcWriteContext {
#[cfg(feature = "zstd")]
fn zstd_compressor(&mut self) -> &mut zstd::bulk::Compressor<'static> {
self.compressor.get_or_insert_with(|| {
Expand All @@ -43,9 +45,9 @@ impl CompressionContext {
}
}

impl std::fmt::Debug for CompressionContext {
impl std::fmt::Debug for IpcWriteContext {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut ds = f.debug_struct("CompressionContext");
let mut ds = f.debug_struct("IpcWriteContext");

#[cfg(feature = "zstd")]
ds.field(
Expand Down Expand Up @@ -143,7 +145,7 @@ impl CompressionCodec {
&self,
input: &[u8],
output: &mut Vec<u8>,
context: &mut CompressionContext,
context: &mut IpcWriteContext,
) -> Result<usize, ArrowError> {
let uncompressed_data_len = input.len();
let original_output_len = output.len();
Expand Down Expand Up @@ -209,7 +211,7 @@ impl CompressionCodec {
&self,
input: &[u8],
output: &mut Vec<u8>,
context: &mut CompressionContext,
context: &mut IpcWriteContext,
) -> Result<(), ArrowError> {
match self {
CompressionCodec::Lz4Frame => compress_lz4(input, output),
Expand Down Expand Up @@ -278,7 +280,7 @@ fn decompress_lz4(_input: &[u8], _decompressed_size: usize) -> Result<Vec<u8>, A
fn compress_zstd(
input: &[u8],
output: &mut Vec<u8>,
context: &mut CompressionContext,
context: &mut IpcWriteContext,
) -> Result<(), ArrowError> {
let result = context.zstd_compressor().compress(input)?;
output.extend_from_slice(&result);
Expand All @@ -290,7 +292,7 @@ fn compress_zstd(
fn compress_zstd(
_input: &[u8],
_output: &mut Vec<u8>,
_context: &mut CompressionContext,
_context: &mut IpcWriteContext,
) -> Result<(), ArrowError> {
Err(ArrowError::InvalidArgumentError(
"zstd IPC compression requires the zstd feature".to_string(),
Expand Down
Loading
Loading