Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion vortex-datafusion/src/persistent/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub(crate) struct VortexFileCache {
file_cache: Cache<FileKey, VortexFile, DefaultHashBuilder>,
segment_cache: Cache<SegmentKey, ByteBuffer, DefaultHashBuilder>,
session: VortexSession,
footer_initial_read_size_bytes: usize,
}

/// Cache key for a [`VortexFile`].
Expand All @@ -59,7 +60,12 @@ struct SegmentKey {
}

impl VortexFileCache {
pub fn new(size_mb: usize, segment_size_mb: usize, session: VortexSession) -> Self {
pub fn new(
size_mb: usize,
segment_size_mb: usize,
footer_initial_read_size_bytes: usize,
session: VortexSession,
) -> Self {
let file_cache = Cache::builder()
.max_capacity(size_mb as u64 * (1 << 20))
.eviction_listener(|k: Arc<FileKey>, _v: VortexFile, cause| {
Expand All @@ -82,9 +88,15 @@ impl VortexFileCache {
file_cache,
segment_cache,
session,
footer_initial_read_size_bytes,
}
}

#[cfg(test)]
pub(crate) fn footer_initial_read_size_bytes(&self) -> usize {
self.footer_initial_read_size_bytes
}

pub async fn try_get(
&self,
object: &ObjectMeta,
Expand All @@ -101,6 +113,7 @@ impl VortexFileCache {
.metrics()
.child_with_tags([("filename", object.location.to_string())]),
)
.with_initial_read_size(self.footer_initial_read_size_bytes)
.with_file_size(object.size)
.with_segment_cache(Arc::new(VortexFileSegmentCache {
file_key,
Expand Down
21 changes: 20 additions & 1 deletion vortex-datafusion/src/persistent/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ use vortex::error::VortexResult;
use vortex::error::vortex_err;
use vortex::expr::stats;
use vortex::expr::stats::Stat;
use vortex::file::EOF_SIZE;
use vortex::file::MAX_POSTSCRIPT_SIZE;
use vortex::file::VORTEX_FILE_EXTENSION;
use vortex::scalar::Scalar;
use vortex::session::VortexSession;
Expand All @@ -61,6 +63,8 @@ use super::source::VortexSource;
use crate::PrecisionExt as _;
use crate::convert::TryToDataFusion;

const DEFAULT_FOOTER_INITIAL_READ_SIZE_BYTES: usize = MAX_POSTSCRIPT_SIZE as usize + EOF_SIZE;

/// Vortex implementation of a DataFusion [`FileFormat`].
pub struct VortexFormat {
session: VortexSession,
Expand All @@ -87,6 +91,11 @@ config_namespace! {
pub footer_cache_size_mb: usize, default = 64
/// The size of the in-memory segment cache.
pub segment_cache_size_mb: usize, default = 0
/// The number of bytes to read when parsing a file footer.
///
/// Values smaller than `MAX_POSTSCRIPT_SIZE + EOF_SIZE` will be clamped to that minimum
/// during footer parsing.
pub footer_initial_read_size_bytes: usize, default = DEFAULT_FOOTER_INITIAL_READ_SIZE_BYTES
}
}

Expand Down Expand Up @@ -186,6 +195,7 @@ impl VortexFormat {
file_cache: VortexFileCache::new(
opts.footer_cache_size_mb,
opts.segment_cache_size_mb,
opts.footer_initial_read_size_bytes,
session,
),
opts,
Expand Down Expand Up @@ -468,7 +478,7 @@ mod tests {
(c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
STORED AS vortex \
LOCATION '{}' \
OPTIONS( segment_cache_size_mb '5' );",
OPTIONS( segment_cache_size_mb '5', footer_initial_read_size_bytes '12345' );",
dir.path().to_str().unwrap()
))
.await
Expand All @@ -477,4 +487,13 @@ mod tests {
.await
.unwrap();
}

#[test]
fn format_plumbs_footer_initial_read_size() {
let mut opts = VortexOptions::default();
opts.set("footer_initial_read_size_bytes", "12345").unwrap();

let format = VortexFormat::new_with_options(VortexSession::default(), opts);
assert_eq!(format.file_cache.footer_initial_read_size_bytes(), 12345);
}
}
10 changes: 5 additions & 5 deletions vortex-datafusion/src/persistent/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ mod tests {
// no adapter
expr_adapter_factory,
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
file_cache: VortexFileCache::new(1, 1, SESSION.clone()),
file_cache: VortexFileCache::new(1, 1, 0, SESSION.clone()),
table_schema,
batch_size: 100,
limit: None,
Expand Down Expand Up @@ -635,7 +635,7 @@ mod tests {
file_pruning_predicate: None,
expr_adapter_factory: expr_adapter_factory.clone(),
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
file_cache: VortexFileCache::new(1, 1, SESSION.clone()),
file_cache: VortexFileCache::new(1, 1, 0, SESSION.clone()),
table_schema: table_schema.clone(),
batch_size: 100,
limit: None,
Expand Down Expand Up @@ -719,7 +719,7 @@ mod tests {
file_pruning_predicate: None,
expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _),
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
file_cache: VortexFileCache::new(1, 1, SESSION.clone()),
file_cache: VortexFileCache::new(1, 1, 0, SESSION.clone()),
table_schema: TableSchema::from_file_schema(table_schema.clone()),
batch_size: 100,
limit: None,
Expand Down Expand Up @@ -870,7 +870,7 @@ mod tests {
file_pruning_predicate: None,
expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _),
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
file_cache: VortexFileCache::new(1, 1, SESSION.clone()),
file_cache: VortexFileCache::new(1, 1, 0, SESSION.clone()),
table_schema: table_schema.clone(),
batch_size: 100,
limit: None,
Expand Down Expand Up @@ -927,7 +927,7 @@ mod tests {
file_pruning_predicate: None,
expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _),
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
file_cache: VortexFileCache::new(1, 1, SESSION.clone()),
file_cache: VortexFileCache::new(1, 1, 0, SESSION.clone()),
table_schema: TableSchema::from_file_schema(schema),
batch_size: 100,
limit: None,
Expand Down
105 changes: 104 additions & 1 deletion vortex-file/src/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use crate::footer::Footer;
use crate::segments::FileSegmentSource;
use crate::segments::InitialReadSegmentCache;

const INITIAL_READ_SIZE: usize = 1 << 20; // 1 MB
const INITIAL_READ_SIZE: usize = MAX_POSTSCRIPT_SIZE as usize + EOF_SIZE;

/// Open options for a Vortex file reader.
pub struct VortexOpenOptions {
Expand Down Expand Up @@ -281,3 +281,106 @@ impl VortexOpenOptions {
.await
}
}

#[cfg(test)]
mod tests {
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;

use futures::future::BoxFuture;
use vortex_array::IntoArray;
use vortex_array::expr::session::ExprSession;
use vortex_array::session::ArraySession;
use vortex_buffer::Buffer;
use vortex_buffer::ByteBufferMut;
use vortex_io::session::RuntimeSession;
use vortex_layout::session::LayoutSession;

use super::*;
use crate::WriteOptionsSessionExt;

// Define CountingReadAt struct
struct CountingReadAt<R> {
inner: R,
total_read: Arc<AtomicUsize>,
first_read_len: Arc<AtomicUsize>,
}

impl<R: VortexReadAt> VortexReadAt for CountingReadAt<R> {
fn read_at(
&self,
offset: u64,
length: usize,
alignment: Alignment,
) -> BoxFuture<'static, VortexResult<ByteBuffer>> {
self.total_read.fetch_add(length, Ordering::Relaxed);
let _ = self.first_read_len.compare_exchange(
0,
length,
Ordering::Relaxed,
Ordering::Relaxed,
);
self.inner.read_at(offset, length, alignment)
}

fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
self.inner.size()
}
}

#[tokio::test]
async fn test_initial_read_size() {
// Create a large file (> 1MB)
let mut buf = ByteBufferMut::empty();
let mut session = VortexSession::empty()
.with::<VortexMetrics>()
.with::<ArraySession>()
.with::<LayoutSession>()
.with::<ExprSession>()
.with::<RuntimeSession>();

crate::register_default_encodings(&mut session);

// 1.5M integers -> ~6MB. We use a pattern to avoid Sequence encoding.
let array = Buffer::from(
(0i32..1_500_000)
.map(|i| if i % 2 == 0 { i } else { -i })
.collect::<Vec<i32>>(),
)
.into_array();

session
.write_options()
.write(&mut buf, array.to_array_stream())
.await
.unwrap();

let buffer = ByteBuffer::from(buf);
assert!(
buffer.len() > 1024 * 1024,
"Buffer length is only {} bytes",
buffer.len()
);

let total_read = Arc::new(AtomicUsize::new(0));
let first_read_len = Arc::new(AtomicUsize::new(0));
let reader = CountingReadAt {
inner: buffer,
total_read: total_read.clone(),
first_read_len: first_read_len.clone(),
};

// Open the file
let _file = session.open_options().open_read_at(reader).await.unwrap();

// Assert that we read approximately the postscript size, not 1MB
let first = first_read_len.load(Ordering::Relaxed);
assert_eq!(
first,
MAX_POSTSCRIPT_SIZE as usize + EOF_SIZE,
"Read exactly the postscript size"
);
let read = total_read.load(Ordering::Relaxed);
assert!(read < 1024 * 1024, "Read {} bytes, expected < 1MB", read);
}
}
21 changes: 18 additions & 3 deletions vortex-io/src/file/std_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use std::fs::File;
#[cfg(not(unix))]
#[cfg(all(not(unix), not(windows)))]
use std::io::Read;
#[cfg(not(unix))]
#[cfg(all(not(unix), not(windows)))]
use std::io::Seek;
#[cfg(unix)]
use std::os::unix::fs::FileExt;
Expand Down Expand Up @@ -37,7 +37,22 @@ pub(crate) fn read_exact_at(file: &File, buffer: &mut [u8], offset: u64) -> std:
{
file.read_exact_at(buffer, offset)
}
#[cfg(not(unix))]
#[cfg(windows)]
{
let mut bytes_read = 0;
while bytes_read < buffer.len() {
let read = file.seek_read(&mut buffer[bytes_read..], offset + bytes_read as u64)?;
if read == 0 {
return Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"failed to fill whole buffer",
));
}
bytes_read += read;
}
Ok(())
}
#[cfg(all(not(unix), not(windows)))]
{
use std::io::SeekFrom;
let mut file_ref = file;
Expand Down
Loading
Loading