diff --git a/arrow-flight/src/encode.rs b/arrow-flight/src/encode.rs index 6c17669956b7..9679de0edbd6 100644 --- a/arrow-flight/src/encode.rs +++ b/arrow-flight/src/encode.rs @@ -20,7 +20,7 @@ use std::{collections::VecDeque, fmt::Debug, pin::Pin, sync::Arc, task::Poll}; use crate::{FlightData, FlightDescriptor, SchemaAsIpc, error::Result}; use arrow_array::{Array, ArrayRef, RecordBatch, RecordBatchOptions, UnionArray}; -use arrow_ipc::writer::{CompressionContext, DictionaryTracker, IpcDataGenerator, IpcWriteOptions}; +use arrow_ipc::writer::{DictionaryTracker, IpcDataGenerator, IpcWriteContext, IpcWriteOptions}; use arrow_schema::{DataType, Field, FieldRef, Fields, Schema, SchemaRef, UnionMode}; use bytes::Bytes; @@ -701,7 +701,7 @@ struct FlightIpcEncoder { options: IpcWriteOptions, data_gen: IpcDataGenerator, dictionary_tracker: DictionaryTracker, - compression_context: CompressionContext, + compression_context: IpcWriteContext, } impl FlightIpcEncoder { @@ -710,7 +710,7 @@ impl FlightIpcEncoder { options, data_gen: IpcDataGenerator::default(), dictionary_tracker: DictionaryTracker::new(error_on_replacement), - compression_context: CompressionContext::default(), + compression_context: IpcWriteContext::default(), } } @@ -1833,7 +1833,7 @@ mod tests { ) -> (Vec, FlightData) { let data_gen = IpcDataGenerator::default(); let mut dictionary_tracker = DictionaryTracker::new(false); - let mut compression_context = CompressionContext::default(); + let mut compression_context = IpcWriteContext::default(); let (encoded_dictionaries, encoded_batch) = data_gen .encode( diff --git a/arrow-flight/src/utils.rs b/arrow-flight/src/utils.rs index 6effb5f86aaf..796688100797 100644 --- a/arrow-flight/src/utils.rs +++ b/arrow-flight/src/utils.rs @@ -24,7 +24,7 @@ use std::sync::Arc; use arrow_array::{ArrayRef, RecordBatch}; use arrow_buffer::Buffer; use arrow_ipc::convert::fb_to_schema; -use arrow_ipc::writer::CompressionContext; +use arrow_ipc::writer::IpcWriteContext; use arrow_ipc::{reader, root_as_message, writer, writer::IpcWriteOptions}; use arrow_schema::{ArrowError, Schema, SchemaRef}; @@ -92,7 +92,7 @@ pub fn batches_to_flight_data( let data_gen = writer::IpcDataGenerator::default(); let mut dictionary_tracker = writer::DictionaryTracker::new(false); - let mut compression_context = CompressionContext::default(); + let mut compression_context = IpcWriteContext::default(); for batch in batches.iter() { let (encoded_dictionaries, encoded_batch) = data_gen.encode( diff --git a/arrow-integration-testing/src/flight_client_scenarios/integration_test.rs b/arrow-integration-testing/src/flight_client_scenarios/integration_test.rs index 05ca5627ecd8..3c01410e3873 100644 --- a/arrow-integration-testing/src/flight_client_scenarios/integration_test.rs +++ b/arrow-integration-testing/src/flight_client_scenarios/integration_test.rs @@ -26,7 +26,7 @@ use arrow::{ datatypes::SchemaRef, ipc::{ self, reader, - writer::{self, CompressionContext}, + writer::{self, IpcWriteContext}, }, record_batch::RecordBatch, }; @@ -95,7 +95,7 @@ async fn upload_data( let mut original_data_iter = original_data.iter().enumerate(); - let mut compression_context = CompressionContext::default(); + let mut compression_context = IpcWriteContext::default(); if let Some((counter, first_batch)) = original_data_iter.next() { let metadata = counter.to_string().into_bytes(); @@ -159,7 +159,7 @@ async fn send_batch( batch: &RecordBatch, options: &writer::IpcWriteOptions, dictionary_tracker: &mut writer::DictionaryTracker, - compression_context: &mut CompressionContext, + compression_context: &mut IpcWriteContext, ) -> Result { let data_gen = writer::IpcDataGenerator::default(); diff --git a/arrow-ipc/src/compression.rs b/arrow-ipc/src/compression.rs index 45558adb44ba..eb8041d644c6 100644 --- a/arrow-ipc/src/compression.rs +++ b/arrow-ipc/src/compression.rs @@ -29,12 +29,14 @@ const DEFAULT_ZSTD_COMPRESSION_LEVEL: i32 = 3; /// compression calls to avoid the performance overhead of initialising a new context for every /// compression. #[derive(Default)] -pub struct CompressionContext { +pub struct IpcWriteContext { + #[expect(dead_code)] + pub(crate) scratch: Vec, #[cfg(feature = "zstd")] compressor: Option>, } -impl CompressionContext { +impl IpcWriteContext { #[cfg(feature = "zstd")] fn zstd_compressor(&mut self, level: i32) -> &mut zstd::bulk::Compressor<'static> { self.compressor.get_or_insert_with(|| { @@ -43,9 +45,9 @@ impl CompressionContext { } } -impl std::fmt::Debug for CompressionContext { +impl std::fmt::Debug for IpcWriteContext { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let mut ds = f.debug_struct("CompressionContext"); + let mut ds = f.debug_struct("IpcWriteContext"); #[cfg(feature = "zstd")] ds.field( @@ -57,6 +59,10 @@ impl std::fmt::Debug for CompressionContext { } } +/// Deprecated alias for [`IpcWriteContext`]. +#[deprecated(since = "59.1.0", note = "Use IpcWriteContext instead")] +pub type CompressionContext = IpcWriteContext; + /// Additional context that may be needed for decompression. /// /// In the case of zstd, this will contain the zstd decompression context, which can be reused @@ -162,7 +168,7 @@ impl CompressionCodec { &self, input: &[u8], output: &mut Vec, - context: &mut CompressionContext, + context: &mut IpcWriteContext, ) -> Result { let uncompressed_data_len = input.len(); let original_output_len = output.len(); @@ -228,7 +234,7 @@ impl CompressionCodec { &self, input: &[u8], output: &mut Vec, - context: &mut CompressionContext, + context: &mut IpcWriteContext, ) -> Result<(), ArrowError> { match self { CompressionCodec::Lz4Frame => compress_lz4(input, output), @@ -297,7 +303,7 @@ fn decompress_lz4(_input: &[u8], _decompressed_size: usize) -> Result, A fn compress_zstd( input: &[u8], output: &mut Vec, - context: &mut CompressionContext, + context: &mut IpcWriteContext, level: i32, ) -> Result<(), ArrowError> { let result = context.zstd_compressor(level).compress(input)?; @@ -310,7 +316,7 @@ fn compress_zstd( fn compress_zstd( _input: &[u8], _output: &mut Vec, - _context: &mut CompressionContext, + _context: &mut IpcWriteContext, _level: i32, ) -> Result<(), ArrowError> { Err(ArrowError::InvalidArgumentError( diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index 9ab86807d4d1..271723423708 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -43,7 +43,8 @@ use arrow_schema::*; use crate::CONTINUATION_MARKER; use crate::compression::CompressionCodec; -pub use crate::compression::CompressionContext; +#[expect(deprecated)] +pub use crate::compression::{CompressionContext, IpcWriteContext}; use crate::convert::IpcSchemaEncoder; /// IPC write options used to control the behaviour of the [`IpcDataGenerator`] @@ -306,7 +307,7 @@ impl Default for IpcWriteOptions { /// # use std::sync::Arc; /// # use arrow_array::UInt64Array; /// # use arrow_array::RecordBatch; -/// # use arrow_ipc::writer::{CompressionContext, DictionaryTracker, IpcDataGenerator, IpcWriteOptions}; +/// # use arrow_ipc::writer::{IpcWriteContext, DictionaryTracker, IpcDataGenerator, IpcWriteOptions}; /// /// // Create a record batch /// let batch = RecordBatch::try_from_iter(vec![ @@ -318,13 +319,13 @@ impl Default for IpcWriteOptions { /// let options = IpcWriteOptions::default(); /// let mut dictionary_tracker = DictionaryTracker::new(error_on_replacement); /// -/// let mut compression_context = CompressionContext::default(); +/// let mut ipc_write_context = IpcWriteContext::default(); /// /// // encode the batch into zero or more encoded dictionaries /// // and the data for the actual array. /// let data_gen = IpcDataGenerator::default(); /// let (encoded_dictionaries, encoded_message) = data_gen -/// .encode(&batch, &mut dictionary_tracker, &options, &mut compression_context) +/// .encode(&batch, &mut dictionary_tracker, &options, &mut ipc_write_context) /// .unwrap(); /// # } /// ``` @@ -372,7 +373,7 @@ impl IpcDataGenerator { dictionary_tracker: &mut DictionaryTracker, write_options: &IpcWriteOptions, dict_id: &mut I, - compression_context: &mut CompressionContext, + ipc_write_context: &mut IpcWriteContext, ) -> Result<(), ArrowError> { match column.data_type() { DataType::Struct(fields) => { @@ -385,7 +386,7 @@ impl IpcDataGenerator { dictionary_tracker, write_options, dict_id, - compression_context, + ipc_write_context, )?; } } @@ -407,7 +408,7 @@ impl IpcDataGenerator { dictionary_tracker, write_options, dict_id, - compression_context, + ipc_write_context, )?; } DataType::List(field) => { @@ -419,7 +420,7 @@ impl IpcDataGenerator { dictionary_tracker, write_options, dict_id, - compression_context, + ipc_write_context, )?; } DataType::LargeList(field) => { @@ -431,7 +432,7 @@ impl IpcDataGenerator { dictionary_tracker, write_options, dict_id, - compression_context, + ipc_write_context, )?; } DataType::ListView(field) => { @@ -443,7 +444,7 @@ impl IpcDataGenerator { dictionary_tracker, write_options, dict_id, - compression_context, + ipc_write_context, )?; } DataType::LargeListView(field) => { @@ -455,7 +456,7 @@ impl IpcDataGenerator { dictionary_tracker, write_options, dict_id, - compression_context, + ipc_write_context, )?; } DataType::FixedSizeList(field, _) => { @@ -470,7 +471,7 @@ impl IpcDataGenerator { dictionary_tracker, write_options, dict_id, - compression_context, + ipc_write_context, )?; } DataType::Map(field, _) => { @@ -489,7 +490,7 @@ impl IpcDataGenerator { dictionary_tracker, write_options, dict_id, - compression_context, + ipc_write_context, )?; // values @@ -500,7 +501,7 @@ impl IpcDataGenerator { dictionary_tracker, write_options, dict_id, - compression_context, + ipc_write_context, )?; } DataType::Union(fields, _) => { @@ -514,7 +515,7 @@ impl IpcDataGenerator { dictionary_tracker, write_options, dict_id, - compression_context, + ipc_write_context, )?; } } @@ -533,7 +534,7 @@ impl IpcDataGenerator { dictionary_tracker: &mut DictionaryTracker, write_options: &IpcWriteOptions, dict_id_seq: &mut I, - compression_context: &mut CompressionContext, + ipc_write_context: &mut IpcWriteContext, ) -> Result<(), ArrowError> { match column.data_type() { DataType::Dictionary(_key_type, _value_type) => { @@ -548,7 +549,7 @@ impl IpcDataGenerator { dictionary_tracker, write_options, dict_id_seq, - compression_context, + ipc_write_context, )?; // It's important to only take the dict_id at this point, because the dict ID @@ -575,7 +576,7 @@ impl IpcDataGenerator { dict_values, write_options, false, - compression_context, + ipc_write_context, )?); } DictionaryUpdate::Delta(data) => { @@ -584,7 +585,7 @@ impl IpcDataGenerator { &data, write_options, true, - compression_context, + ipc_write_context, )?); } } @@ -595,7 +596,7 @@ impl IpcDataGenerator { dictionary_tracker, write_options, dict_id_seq, - compression_context, + ipc_write_context, )?, } @@ -610,19 +611,15 @@ impl IpcDataGenerator { batch: &RecordBatch, dictionary_tracker: &mut DictionaryTracker, write_options: &IpcWriteOptions, - compression_context: &mut CompressionContext, + ipc_write_context: &mut IpcWriteContext, ) -> Result<(Vec, EncodedData), ArrowError> { - let encoded_dictionaries = self.encode_all_dicts( - batch, - dictionary_tracker, - write_options, - compression_context, - )?; + let encoded_dictionaries = + self.encode_all_dicts(batch, dictionary_tracker, write_options, ipc_write_context)?; let mut arrow_data = Vec::new(); let (ipc_message, _, tail_pad) = self.record_batch_to_bytes( batch, write_options, - compression_context, + ipc_write_context, &mut IpcBodySink::Write(&mut arrow_data), )?; arrow_data.extend_from_slice(&PADDING[..tail_pad]); @@ -641,7 +638,7 @@ impl IpcDataGenerator { batch: &RecordBatch, dictionary_tracker: &mut DictionaryTracker, write_options: &IpcWriteOptions, - compression_context: &mut CompressionContext, + ipc_write_context: &mut IpcWriteContext, ) -> Result, ArrowError> { let schema = batch.schema(); let mut encoded_dictionaries = Vec::with_capacity(schema.flattened_fields().len()); @@ -654,7 +651,7 @@ impl IpcDataGenerator { dictionary_tracker, write_options, &mut dict_id, - compression_context, + ipc_write_context, )?; } Ok(encoded_dictionaries) @@ -668,15 +665,11 @@ impl IpcDataGenerator { batch: &RecordBatch, dictionary_tracker: &mut DictionaryTracker, write_options: &IpcWriteOptions, - compression_context: &mut CompressionContext, + ipc_write_context: &mut IpcWriteContext, writer: &mut W, ) -> Result { - let encoded_dictionaries = self.encode_all_dicts( - batch, - dictionary_tracker, - write_options, - compression_context, - )?; + let encoded_dictionaries = + self.encode_all_dicts(batch, dictionary_tracker, write_options, ipc_write_context)?; let mut dictionary_block_sizes = Vec::with_capacity(encoded_dictionaries.len()); for dict in encoded_dictionaries { @@ -692,7 +685,7 @@ impl IpcDataGenerator { let (ipc_message, body_len, tail_pad) = self.record_batch_to_bytes( batch, write_options, - compression_context, + ipc_write_context, &mut IpcBodySink::Collect(&mut encoded_buffers), )?; @@ -751,7 +744,7 @@ impl IpcDataGenerator { &self, batch: &RecordBatch, write_options: &IpcWriteOptions, - compression_context: &mut CompressionContext, + ipc_write_context: &mut IpcWriteContext, sink: &mut IpcBodySink<'_>, ) -> Result<(Vec, usize, usize), ArrowError> { let mut fbb = FlatBufferBuilder::new(); @@ -788,7 +781,7 @@ impl IpcDataGenerator { sink, offset, compression_codec, - compression_context, + ipc_write_context, write_options, )?; append_variadic_buffer_counts(&mut variadic_buffer_counts, &array_data); @@ -838,7 +831,7 @@ impl IpcDataGenerator { array_data: &ArrayData, write_options: &IpcWriteOptions, is_delta: bool, - compression_context: &mut CompressionContext, + ipc_write_context: &mut IpcWriteContext, ) -> Result { let mut fbb = FlatBufferBuilder::new(); @@ -873,7 +866,7 @@ impl IpcDataGenerator { &mut sink, 0, compression_codec, - compression_context, + ipc_write_context, write_options, )?; @@ -1313,7 +1306,7 @@ pub struct FileWriter { data_gen: IpcDataGenerator, - compression_context: CompressionContext, + ipc_write_context: IpcWriteContext, } impl FileWriter> { @@ -1375,7 +1368,7 @@ impl FileWriter { dictionary_tracker, custom_metadata: HashMap::new(), data_gen, - compression_context: CompressionContext::default(), + ipc_write_context: IpcWriteContext::default(), }) } @@ -1396,7 +1389,7 @@ impl FileWriter { batch, &mut self.dictionary_tracker, &self.write_options, - &mut self.compression_context, + &mut self.ipc_write_context, &mut self.writer, )?; @@ -1604,7 +1597,7 @@ pub struct StreamWriter { data_gen: IpcDataGenerator, - compression_context: CompressionContext, + ipc_write_context: IpcWriteContext, } impl StreamWriter> { @@ -1655,7 +1648,7 @@ impl StreamWriter { finished: false, dictionary_tracker, data_gen, - compression_context: CompressionContext::default(), + ipc_write_context: IpcWriteContext::default(), }) } @@ -1671,7 +1664,7 @@ impl StreamWriter { batch, &mut self.dictionary_tracker, &self.write_options, - &mut self.compression_context, + &mut self.ipc_write_context, &mut self.writer, )?; Ok(()) @@ -2032,7 +2025,7 @@ fn write_array_data( sink: &mut IpcBodySink<'_>, offset: i64, compression_codec: Option, - compression_context: &mut CompressionContext, + ipc_write_context: &mut IpcWriteContext, write_options: &IpcWriteOptions, ) -> Result { let mut offset = offset; @@ -2066,7 +2059,7 @@ fn write_array_data( sink, offset, compression_codec, - compression_context, + ipc_write_context, write_options.alignment, )?; } @@ -2081,7 +2074,7 @@ fn write_array_data( sink, offset, compression_codec, - compression_context, + ipc_write_context, write_options.alignment, )?; } @@ -2099,7 +2092,7 @@ fn write_array_data( sink, offset, compression_codec, - compression_context, + ipc_write_context, write_options.alignment, )?; @@ -2110,7 +2103,7 @@ fn write_array_data( sink, offset, compression_codec, - compression_context, + ipc_write_context, write_options.alignment, )?; } @@ -2123,7 +2116,7 @@ fn write_array_data( sink, offset, compression_codec, - compression_context, + ipc_write_context, write_options.alignment, )?; } @@ -2144,7 +2137,7 @@ fn write_array_data( sink, offset, compression_codec, - compression_context, + ipc_write_context, write_options.alignment, )?; } else if matches!(data_type, DataType::Boolean) { @@ -2160,7 +2153,7 @@ fn write_array_data( sink, offset, compression_codec, - compression_context, + ipc_write_context, write_options.alignment, )?; } else if matches!( @@ -2183,7 +2176,7 @@ fn write_array_data( sink, offset, compression_codec, - compression_context, + ipc_write_context, write_options.alignment, )?; offset = write_array_data( @@ -2192,7 +2185,7 @@ fn write_array_data( sink, offset, compression_codec, - compression_context, + ipc_write_context, write_options, )?; return Ok(offset); @@ -2215,7 +2208,7 @@ fn write_array_data( sink, offset, compression_codec, - compression_context, + ipc_write_context, write_options.alignment, )?; offset = encode_sink_buffer( @@ -2224,7 +2217,7 @@ fn write_array_data( sink, offset, compression_codec, - compression_context, + ipc_write_context, write_options.alignment, )?; @@ -2234,7 +2227,7 @@ fn write_array_data( sink, offset, compression_codec, - compression_context, + ipc_write_context, write_options, )?; return Ok(offset); @@ -2252,7 +2245,7 @@ fn write_array_data( sink, offset, compression_codec, - compression_context, + ipc_write_context, write_options, )?; return Ok(offset); @@ -2264,7 +2257,7 @@ fn write_array_data( sink, offset, compression_codec, - compression_context, + ipc_write_context, write_options.alignment, )?; } @@ -2284,7 +2277,7 @@ fn write_array_data( sink, offset, compression_codec, - compression_context, + ipc_write_context, write_options, )?; } @@ -2299,7 +2292,7 @@ fn write_array_data( sink, offset, compression_codec, - compression_context, + ipc_write_context, write_options, )?; } @@ -2316,8 +2309,8 @@ fn write_array_data( /// - `sink`: destination for the actual encoded bytes; either a contiguous `Vec` for /// in-memory writes, or a list of [`EncodedBuffer`] segments for deferred zero-copy streaming. /// - `offset`: running byte offset into the IPC message body, used to compute the metadata entry. -/// - `compression_codec` / `compression_context`: if `Some`, the buffer is compressed before -/// writing; `compression_context` provides reusable scratch space across calls. +/// - `compression_codec` / `ipc_write_context`: if `Some`, the buffer is compressed before +/// writing; `ipc_write_context` provides reusable scratch space across calls. /// - `alignment`: each buffer is padded to this many bytes so the next buffer starts aligned. /// /// Returns the updated `offset` (advanced by the encoded length plus any alignment padding). @@ -2327,7 +2320,7 @@ fn encode_sink_buffer( sink: &mut IpcBodySink<'_>, offset: i64, compression_codec: Option, - compression_context: &mut CompressionContext, + ipc_write_context: &mut IpcWriteContext, alignment: u8, ) -> Result { let (encoded, len) = match compression_codec { @@ -2338,7 +2331,7 @@ fn encode_sink_buffer( Some(codec) => { let mut scratch = Vec::new(); let written = - codec.compress_to_vec(buffer.as_slice(), &mut scratch, compression_context)?; + codec.compress_to_vec(buffer.as_slice(), &mut scratch, ipc_write_context)?; let len = i64::try_from(written) .map_err(|e| ArrowError::InvalidArgumentError(format!("{e}")))?; (EncodedBuffer::Compressed(scratch), len) @@ -4523,7 +4516,7 @@ mod tests { let data_gen = IpcDataGenerator::default(); let mut dictionary_tracker = DictionaryTracker::new(false); let writer_options = IpcWriteOptions::default(); - let mut compression_ctx = CompressionContext::default(); + let mut compression_ctx = IpcWriteContext::default(); let schema = Arc::new(Schema::new(vec![Field::new( "a",