Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions arrow-flight/src/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::{collections::VecDeque, fmt::Debug, pin::Pin, sync::Arc, task::Poll};
use crate::{FlightData, FlightDescriptor, SchemaAsIpc, error::Result};

use arrow_array::{Array, ArrayRef, RecordBatch, RecordBatchOptions, UnionArray};
use arrow_ipc::writer::{CompressionContext, DictionaryTracker, IpcDataGenerator, IpcWriteOptions};
use arrow_ipc::writer::{IpcWriteContext, DictionaryTracker, IpcDataGenerator, IpcWriteOptions};

use arrow_schema::{DataType, Field, FieldRef, Fields, Schema, SchemaRef, UnionMode};
use bytes::Bytes;
Expand Down Expand Up @@ -701,7 +701,7 @@ struct FlightIpcEncoder {
options: IpcWriteOptions,
data_gen: IpcDataGenerator,
dictionary_tracker: DictionaryTracker,
compression_context: CompressionContext,
compression_context: IpcWriteContext,
}

impl FlightIpcEncoder {
Expand All @@ -710,7 +710,7 @@ impl FlightIpcEncoder {
options,
data_gen: IpcDataGenerator::default(),
dictionary_tracker: DictionaryTracker::new(error_on_replacement),
compression_context: CompressionContext::default(),
compression_context: IpcWriteContext::default(),
}
}

Expand Down Expand Up @@ -1833,7 +1833,7 @@ mod tests {
) -> (Vec<FlightData>, FlightData) {
let data_gen = IpcDataGenerator::default();
let mut dictionary_tracker = DictionaryTracker::new(false);
let mut compression_context = CompressionContext::default();
let mut compression_context = IpcWriteContext::default();

let (encoded_dictionaries, encoded_batch) = data_gen
.encode(
Expand Down
4 changes: 2 additions & 2 deletions arrow-flight/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::sync::Arc;
use arrow_array::{ArrayRef, RecordBatch};
use arrow_buffer::Buffer;
use arrow_ipc::convert::fb_to_schema;
use arrow_ipc::writer::CompressionContext;
use arrow_ipc::writer::IpcWriteContext;
use arrow_ipc::{reader, root_as_message, writer, writer::IpcWriteOptions};
use arrow_schema::{ArrowError, Schema, SchemaRef};

Expand Down Expand Up @@ -92,7 +92,7 @@ pub fn batches_to_flight_data(

let data_gen = writer::IpcDataGenerator::default();
let mut dictionary_tracker = writer::DictionaryTracker::new(false);
let mut compression_context = CompressionContext::default();
let mut compression_context = IpcWriteContext::default();

for batch in batches.iter() {
let (encoded_dictionaries, encoded_batch) = data_gen.encode(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use arrow::{
datatypes::SchemaRef,
ipc::{
self, reader,
writer::{self, CompressionContext},
writer::{self, IpcWriteContext},
},
record_batch::RecordBatch,
};
Expand Down Expand Up @@ -95,7 +95,7 @@ async fn upload_data(

let mut original_data_iter = original_data.iter().enumerate();

let mut compression_context = CompressionContext::default();
let mut compression_context = IpcWriteContext::default();

if let Some((counter, first_batch)) = original_data_iter.next() {
let metadata = counter.to_string().into_bytes();
Expand Down Expand Up @@ -159,7 +159,7 @@ async fn send_batch(
batch: &RecordBatch,
options: &writer::IpcWriteOptions,
dictionary_tracker: &mut writer::DictionaryTracker,
compression_context: &mut CompressionContext,
compression_context: &mut IpcWriteContext,
) -> Result {
let data_gen = writer::IpcDataGenerator::default();

Expand Down
22 changes: 14 additions & 8 deletions arrow-ipc/src/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@ const LENGTH_OF_PREFIX_DATA: i64 = 8;
/// compression calls to avoid the performance overhead of initialising a new context for every
/// compression.
#[derive(Default)]
pub struct CompressionContext {
pub struct IpcWriteContext {
#[allow(dead_code)]
pub(crate) scratch: Vec<u8>,
#[cfg(feature = "zstd")]
compressor: Option<zstd::bulk::Compressor<'static>>,
}

impl CompressionContext {
impl IpcWriteContext {
#[cfg(feature = "zstd")]
fn zstd_compressor(&mut self) -> &mut zstd::bulk::Compressor<'static> {
self.compressor.get_or_insert_with(|| {
Expand All @@ -43,9 +45,9 @@ impl CompressionContext {
}
}

impl std::fmt::Debug for CompressionContext {
impl std::fmt::Debug for IpcWriteContext {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut ds = f.debug_struct("CompressionContext");
let mut ds = f.debug_struct("IpcWriteContext");

#[cfg(feature = "zstd")]
ds.field(
Expand All @@ -57,6 +59,10 @@ impl std::fmt::Debug for CompressionContext {
}
}

/// Deprecated alias for [`IpcWriteContext`].
#[deprecated(since = "57.0.0", note = "Use IpcWriteContext instead")]
pub type CompressionContext = IpcWriteContext;

/// Additional context that may be needed for decompression.
///
/// In the case of zstd, this will contain the zstd decompression context, which can be reused
Expand Down Expand Up @@ -143,7 +149,7 @@ impl CompressionCodec {
&self,
input: &[u8],
output: &mut Vec<u8>,
context: &mut CompressionContext,
context: &mut IpcWriteContext,
) -> Result<usize, ArrowError> {
let uncompressed_data_len = input.len();
let original_output_len = output.len();
Expand Down Expand Up @@ -209,7 +215,7 @@ impl CompressionCodec {
&self,
input: &[u8],
output: &mut Vec<u8>,
context: &mut CompressionContext,
context: &mut IpcWriteContext,
) -> Result<(), ArrowError> {
match self {
CompressionCodec::Lz4Frame => compress_lz4(input, output),
Expand Down Expand Up @@ -278,7 +284,7 @@ fn decompress_lz4(_input: &[u8], _decompressed_size: usize) -> Result<Vec<u8>, A
fn compress_zstd(
input: &[u8],
output: &mut Vec<u8>,
context: &mut CompressionContext,
context: &mut IpcWriteContext,
) -> Result<(), ArrowError> {
let result = context.zstd_compressor().compress(input)?;
output.extend_from_slice(&result);
Expand All @@ -290,7 +296,7 @@ fn compress_zstd(
fn compress_zstd(
_input: &[u8],
_output: &mut Vec<u8>,
_context: &mut CompressionContext,
_context: &mut IpcWriteContext,
) -> Result<(), ArrowError> {
Err(ArrowError::InvalidArgumentError(
"zstd IPC compression requires the zstd feature".to_string(),
Expand Down
Loading
Loading