Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion arrow-ipc/src/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use crate::CompressionType;
use arrow_buffer::Buffer;
use arrow_schema::ArrowError;
use flatbuffers::FlatBufferBuilder;

const LENGTH_NO_COMPRESSED_DATA: i64 = -1;
const LENGTH_OF_PREFIX_DATA: i64 = 8;
Expand All @@ -26,14 +27,20 @@ const LENGTH_OF_PREFIX_DATA: i64 = 8;
///
/// In the case of zstd, this will contain the zstd context, which can be reused between subsequent
/// compression calls to avoid the performance overhead of initialising a new context for every
/// compression.
/// compression. Also holds a [`FlatBufferBuilder`] that is reused across IPC writes.
#[derive(Default)]
pub struct CompressionContext {
fbb: FlatBufferBuilder<'static>,
#[cfg(feature = "zstd")]
compressor: Option<zstd::bulk::Compressor<'static>>,
}

impl CompressionContext {
/// Get a mutable reference to the [`FlatBufferBuilder`] that is reused across IPC writes.
pub(crate) fn mut_fbb(&mut self) -> &mut FlatBufferBuilder<'static> {
&mut self.fbb
}

#[cfg(feature = "zstd")]
fn zstd_compressor(&mut self) -> &mut zstd::bulk::Compressor<'static> {
self.compressor.get_or_insert_with(|| {
Expand Down
33 changes: 17 additions & 16 deletions arrow-ipc/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -692,12 +692,11 @@ impl IpcDataGenerator {
compression_context: &mut CompressionContext,
sink: &mut IpcBodySink<'_>,
) -> Result<(Vec<u8>, usize, usize), ArrowError> {
let mut fbb = FlatBufferBuilder::new();

let batch_compression_type = write_options.batch_compression_type;

let compression = batch_compression_type.map(|batch_compression_type| {
let mut c = crate::BodyCompressionBuilder::new(&mut fbb);
let fbb = compression_context.mut_fbb();
let mut c = crate::BodyCompressionBuilder::new(fbb);
c.add_method(crate::BodyCompressionMethod::BUFFER);
c.add_codec(batch_compression_type);
c.finish()
Expand Down Expand Up @@ -728,6 +727,7 @@ impl IpcDataGenerator {
let tail_pad = pad_to_alignment(alignment, offset as usize);
let body_len = offset as usize + tail_pad;

let fbb = compression_context.mut_fbb();
let buffers = fbb.create_vector(&meta.buffers);
let nodes = fbb.create_vector(&meta.nodes);
let variadic_buffer = if variadic_buffer_counts.is_empty() {
Expand All @@ -737,7 +737,7 @@ impl IpcDataGenerator {
};

let root = {
let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb);
let mut batch_builder = crate::RecordBatchBuilder::new(fbb);
batch_builder.add_length(batch.num_rows() as i64);
batch_builder.add_nodes(nodes);
batch_builder.add_buffers(buffers);
Expand All @@ -749,16 +749,17 @@ impl IpcDataGenerator {
}
batch_builder.finish().as_union_value()
};
// create an crate::Message
let mut message = crate::MessageBuilder::new(&mut fbb);
let mut message = crate::MessageBuilder::new(fbb);
message.add_version(write_options.metadata_version);
message.add_header_type(crate::MessageHeader::RecordBatch);
message.add_bodyLength(body_len as i64);
message.add_header(root);
let root = message.finish();
fbb.finish(root, None);

Ok((fbb.finished_data().to_vec(), body_len, tail_pad))
let ipc_message = fbb.finished_data().to_vec();
fbb.reset();
Ok((ipc_message, body_len, tail_pad))
}

/// Write dictionary values into two sets of bytes, one for the header (crate::Message) and the
Expand All @@ -771,15 +772,14 @@ impl IpcDataGenerator {
is_delta: bool,
compression_context: &mut CompressionContext,
) -> Result<EncodedData, ArrowError> {
let mut fbb = FlatBufferBuilder::new();

let mut arrow_data: Vec<u8> = vec![];

// get the type of compression
let batch_compression_type = write_options.batch_compression_type;

let compression = batch_compression_type.map(|batch_compression_type| {
let mut c = crate::BodyCompressionBuilder::new(&mut fbb);
let fbb = compression_context.mut_fbb();
let mut c = crate::BodyCompressionBuilder::new(fbb);
c.add_method(crate::BodyCompressionMethod::BUFFER);
c.add_codec(batch_compression_type);
c.finish()
Expand Down Expand Up @@ -810,7 +810,7 @@ impl IpcDataGenerator {
let body_len = offset as usize + tail_pad;
arrow_data.extend_from_slice(&PADDING[..tail_pad]);

// write data
let fbb = compression_context.mut_fbb();
let buffers = fbb.create_vector(&meta.buffers);
let nodes = fbb.create_vector(&meta.nodes);
let variadic_buffer = if variadic_buffer_counts.is_empty() {
Expand All @@ -820,7 +820,7 @@ impl IpcDataGenerator {
};

let root = {
let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb);
let mut batch_builder = crate::RecordBatchBuilder::new(fbb);
batch_builder.add_length(array_data.len() as i64);
batch_builder.add_nodes(nodes);
batch_builder.add_buffers(buffers);
Expand All @@ -834,15 +834,15 @@ impl IpcDataGenerator {
};

let root = {
let mut batch_builder = crate::DictionaryBatchBuilder::new(&mut fbb);
let mut batch_builder = crate::DictionaryBatchBuilder::new(fbb);
batch_builder.add_id(dict_id);
batch_builder.add_data(root);
batch_builder.add_isDelta(is_delta);
batch_builder.finish().as_union_value()
};

let root = {
let mut message_builder = crate::MessageBuilder::new(&mut fbb);
let mut message_builder = crate::MessageBuilder::new(fbb);
message_builder.add_version(write_options.metadata_version);
message_builder.add_header_type(crate::MessageHeader::DictionaryBatch);
message_builder.add_bodyLength(body_len as i64);
Expand All @@ -851,10 +851,11 @@ impl IpcDataGenerator {
};

fbb.finish(root, None);
let finished_data = fbb.finished_data();
let ipc_message = fbb.finished_data().to_vec();
fbb.reset();

Ok(EncodedData {
ipc_message: finished_data.to_vec(),
ipc_message,
arrow_data,
})
}
Expand Down
Loading