diff --git a/vortex-datafusion/src/persistent/cache.rs b/vortex-datafusion/src/persistent/cache.rs index 8f8220e470f..c910f728da7 100644 --- a/vortex-datafusion/src/persistent/cache.rs +++ b/vortex-datafusion/src/persistent/cache.rs @@ -33,6 +33,7 @@ pub(crate) struct VortexFileCache { file_cache: Cache, segment_cache: Cache, session: VortexSession, + footer_initial_read_size_bytes: usize, } /// Cache key for a [`VortexFile`]. @@ -59,7 +60,12 @@ struct SegmentKey { } impl VortexFileCache { - pub fn new(size_mb: usize, segment_size_mb: usize, session: VortexSession) -> Self { + pub fn new( + size_mb: usize, + segment_size_mb: usize, + footer_initial_read_size_bytes: usize, + session: VortexSession, + ) -> Self { let file_cache = Cache::builder() .max_capacity(size_mb as u64 * (1 << 20)) .eviction_listener(|k: Arc, _v: VortexFile, cause| { @@ -82,9 +88,15 @@ impl VortexFileCache { file_cache, segment_cache, session, + footer_initial_read_size_bytes, } } + #[cfg(test)] + pub(crate) fn footer_initial_read_size_bytes(&self) -> usize { + self.footer_initial_read_size_bytes + } + pub async fn try_get( &self, object: &ObjectMeta, @@ -101,6 +113,7 @@ impl VortexFileCache { .metrics() .child_with_tags([("filename", object.location.to_string())]), ) + .with_initial_read_size(self.footer_initial_read_size_bytes) .with_file_size(object.size) .with_segment_cache(Arc::new(VortexFileSegmentCache { file_key, diff --git a/vortex-datafusion/src/persistent/format.rs b/vortex-datafusion/src/persistent/format.rs index f8a64c0320c..f1b11e0a45b 100644 --- a/vortex-datafusion/src/persistent/format.rs +++ b/vortex-datafusion/src/persistent/format.rs @@ -51,6 +51,8 @@ use vortex::error::VortexResult; use vortex::error::vortex_err; use vortex::expr::stats; use vortex::expr::stats::Stat; +use vortex::file::EOF_SIZE; +use vortex::file::MAX_POSTSCRIPT_SIZE; use vortex::file::VORTEX_FILE_EXTENSION; use vortex::scalar::Scalar; use vortex::session::VortexSession; @@ -61,6 +63,8 @@ use super::source::VortexSource; use crate::PrecisionExt as _; use crate::convert::TryToDataFusion; +const DEFAULT_FOOTER_INITIAL_READ_SIZE_BYTES: usize = MAX_POSTSCRIPT_SIZE as usize + EOF_SIZE; + /// Vortex implementation of a DataFusion [`FileFormat`]. pub struct VortexFormat { session: VortexSession, @@ -87,6 +91,11 @@ config_namespace! { pub footer_cache_size_mb: usize, default = 64 /// The size of the in-memory segment cache. pub segment_cache_size_mb: usize, default = 0 + /// The number of bytes to read when parsing a file footer. + /// + /// Values smaller than `MAX_POSTSCRIPT_SIZE + EOF_SIZE` will be clamped to that minimum + /// during footer parsing. + pub footer_initial_read_size_bytes: usize, default = DEFAULT_FOOTER_INITIAL_READ_SIZE_BYTES } } @@ -186,6 +195,7 @@ impl VortexFormat { file_cache: VortexFileCache::new( opts.footer_cache_size_mb, opts.segment_cache_size_mb, + opts.footer_initial_read_size_bytes, session, ), opts, @@ -468,7 +478,7 @@ mod tests { (c1 VARCHAR NOT NULL, c2 INT NOT NULL) \ STORED AS vortex \ LOCATION '{}' \ - OPTIONS( segment_cache_size_mb '5' );", + OPTIONS( segment_cache_size_mb '5', footer_initial_read_size_bytes '12345' );", dir.path().to_str().unwrap() )) .await @@ -477,4 +487,13 @@ mod tests { .await .unwrap(); } + + #[test] + fn format_plumbs_footer_initial_read_size() { + let mut opts = VortexOptions::default(); + opts.set("footer_initial_read_size_bytes", "12345").unwrap(); + + let format = VortexFormat::new_with_options(VortexSession::default(), opts); + assert_eq!(format.file_cache.footer_initial_read_size_bytes(), 12345); + } } diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index b5474f7bc1e..71235d943ea 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -489,7 +489,7 @@ mod tests { // no adapter expr_adapter_factory, schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), - file_cache: VortexFileCache::new(1, 1, SESSION.clone()), + file_cache: VortexFileCache::new(1, 1, 0, SESSION.clone()), table_schema, batch_size: 100, limit: None, @@ -635,7 +635,7 @@ mod tests { file_pruning_predicate: None, expr_adapter_factory: expr_adapter_factory.clone(), schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), - file_cache: VortexFileCache::new(1, 1, SESSION.clone()), + file_cache: VortexFileCache::new(1, 1, 0, SESSION.clone()), table_schema: table_schema.clone(), batch_size: 100, limit: None, @@ -719,7 +719,7 @@ mod tests { file_pruning_predicate: None, expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _), schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), - file_cache: VortexFileCache::new(1, 1, SESSION.clone()), + file_cache: VortexFileCache::new(1, 1, 0, SESSION.clone()), table_schema: TableSchema::from_file_schema(table_schema.clone()), batch_size: 100, limit: None, @@ -870,7 +870,7 @@ mod tests { file_pruning_predicate: None, expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _), schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), - file_cache: VortexFileCache::new(1, 1, SESSION.clone()), + file_cache: VortexFileCache::new(1, 1, 0, SESSION.clone()), table_schema: table_schema.clone(), batch_size: 100, limit: None, @@ -927,7 +927,7 @@ mod tests { file_pruning_predicate: None, expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _), schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), - file_cache: VortexFileCache::new(1, 1, SESSION.clone()), + file_cache: VortexFileCache::new(1, 1, 0, SESSION.clone()), table_schema: TableSchema::from_file_schema(schema), batch_size: 100, limit: None, diff --git a/vortex-file/src/open.rs b/vortex-file/src/open.rs index 1ee6d08feb2..eb7261d92db 100644 --- a/vortex-file/src/open.rs +++ b/vortex-file/src/open.rs @@ -36,7 +36,7 @@ use crate::footer::Footer; use crate::segments::FileSegmentSource; use crate::segments::InitialReadSegmentCache; -const INITIAL_READ_SIZE: usize = 1 << 20; // 1 MB +const INITIAL_READ_SIZE: usize = MAX_POSTSCRIPT_SIZE as usize + EOF_SIZE; /// Open options for a Vortex file reader. pub struct VortexOpenOptions { @@ -281,3 +281,106 @@ impl VortexOpenOptions { .await } } + +#[cfg(test)] +mod tests { + use std::sync::atomic::AtomicUsize; + use std::sync::atomic::Ordering; + + use futures::future::BoxFuture; + use vortex_array::IntoArray; + use vortex_array::expr::session::ExprSession; + use vortex_array::session::ArraySession; + use vortex_buffer::Buffer; + use vortex_buffer::ByteBufferMut; + use vortex_io::session::RuntimeSession; + use vortex_layout::session::LayoutSession; + + use super::*; + use crate::WriteOptionsSessionExt; + + // Define CountingReadAt struct + struct CountingReadAt { + inner: R, + total_read: Arc, + first_read_len: Arc, + } + + impl VortexReadAt for CountingReadAt { + fn read_at( + &self, + offset: u64, + length: usize, + alignment: Alignment, + ) -> BoxFuture<'static, VortexResult> { + self.total_read.fetch_add(length, Ordering::Relaxed); + let _ = self.first_read_len.compare_exchange( + 0, + length, + Ordering::Relaxed, + Ordering::Relaxed, + ); + self.inner.read_at(offset, length, alignment) + } + + fn size(&self) -> BoxFuture<'static, VortexResult> { + self.inner.size() + } + } + + #[tokio::test] + async fn test_initial_read_size() { + // Create a large file (> 1MB) + let mut buf = ByteBufferMut::empty(); + let mut session = VortexSession::empty() + .with::() + .with::() + .with::() + .with::() + .with::(); + + crate::register_default_encodings(&mut session); + + // 1.5M integers -> ~6MB. We use a pattern to avoid Sequence encoding. + let array = Buffer::from( + (0i32..1_500_000) + .map(|i| if i % 2 == 0 { i } else { -i }) + .collect::>(), + ) + .into_array(); + + session + .write_options() + .write(&mut buf, array.to_array_stream()) + .await + .unwrap(); + + let buffer = ByteBuffer::from(buf); + assert!( + buffer.len() > 1024 * 1024, + "Buffer length is only {} bytes", + buffer.len() + ); + + let total_read = Arc::new(AtomicUsize::new(0)); + let first_read_len = Arc::new(AtomicUsize::new(0)); + let reader = CountingReadAt { + inner: buffer, + total_read: total_read.clone(), + first_read_len: first_read_len.clone(), + }; + + // Open the file + let _file = session.open_options().open_read_at(reader).await.unwrap(); + + // Assert that we read approximately the postscript size, not 1MB + let first = first_read_len.load(Ordering::Relaxed); + assert_eq!( + first, + MAX_POSTSCRIPT_SIZE as usize + EOF_SIZE, + "Read exactly the postscript size" + ); + let read = total_read.load(Ordering::Relaxed); + assert!(read < 1024 * 1024, "Read {} bytes, expected < 1MB", read); + } +} diff --git a/vortex-io/src/file/std_file.rs b/vortex-io/src/file/std_file.rs index a6b43834254..f1527f0ea71 100644 --- a/vortex-io/src/file/std_file.rs +++ b/vortex-io/src/file/std_file.rs @@ -2,9 +2,9 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors use std::fs::File; -#[cfg(not(unix))] +#[cfg(all(not(unix), not(windows)))] use std::io::Read; -#[cfg(not(unix))] +#[cfg(all(not(unix), not(windows)))] use std::io::Seek; #[cfg(unix)] use std::os::unix::fs::FileExt; @@ -37,7 +37,22 @@ pub(crate) fn read_exact_at(file: &File, buffer: &mut [u8], offset: u64) -> std: { file.read_exact_at(buffer, offset) } - #[cfg(not(unix))] + #[cfg(windows)] + { + let mut bytes_read = 0; + while bytes_read < buffer.len() { + let read = file.seek_read(&mut buffer[bytes_read..], offset + bytes_read as u64)?; + if read == 0 { + return Err(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + "failed to fill whole buffer", + )); + } + bytes_read += read; + } + Ok(()) + } + #[cfg(all(not(unix), not(windows)))] { use std::io::SeekFrom; let mut file_ref = file; diff --git a/vortex-scan/src/scan_builder.rs b/vortex-scan/src/scan_builder.rs index 8489d4230f5..fd91c0608a7 100644 --- a/vortex-scan/src/scan_builder.rs +++ b/vortex-scan/src/scan_builder.rs @@ -2,10 +2,15 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors use std::ops::Range; +use std::pin::Pin; use std::sync::Arc; +use std::task::Context; +use std::task::Poll; use futures::Stream; +use futures::StreamExt; use futures::future::BoxFuture; +use futures::stream::BoxStream; use itertools::Itertools; use vortex_array::ArrayRef; use vortex_array::expr::Expression; @@ -22,6 +27,7 @@ use vortex_dtype::Field; use vortex_dtype::FieldMask; use vortex_dtype::FieldName; use vortex_dtype::FieldPath; +use vortex_error::VortexExpect; use vortex_error::VortexResult; use vortex_error::vortex_bail; use vortex_io::runtime::BlockingRuntime; @@ -287,7 +293,7 @@ impl ScanBuilder { pub fn into_stream( self, ) -> VortexResult> + Send + 'static + use> { - self.prepare()?.execute_stream(None) + Ok(LazyScanStream::new(self)) } /// Returns an [`Iterator`] using the session's runtime. @@ -300,6 +306,49 @@ impl ScanBuilder { } } +enum LazyScanState { + Builder(Option>>), + Stream(BoxStream<'static, VortexResult>), + Error(Option), +} + +struct LazyScanStream { + state: LazyScanState, +} + +impl LazyScanStream { + fn new(builder: ScanBuilder) -> Self { + Self { + state: LazyScanState::Builder(Some(Box::new(builder))), + } + } +} + +impl Unpin for LazyScanStream {} + +impl Stream for LazyScanStream { + type Item = VortexResult; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + loop { + match &mut self.state { + LazyScanState::Builder(builder) => { + let builder = builder.take().vortex_expect("polled after completion"); + match builder + .prepare() + .and_then(|scan| scan.execute_stream(None).map(|s| s.boxed())) + { + Ok(stream) => self.state = LazyScanState::Stream(stream), + Err(err) => self.state = LazyScanState::Error(Some(err)), + } + } + LazyScanState::Stream(stream) => return stream.as_mut().poll_next(cx), + LazyScanState::Error(err) => return Poll::Ready(err.take().map(Err)), + } + } + } +} + /// Compute masks of field paths referenced by the projection and filter in the scan. /// /// Projection and filter must be pre-simplified. @@ -338,3 +387,114 @@ pub(crate) fn filter_and_projection_masks( fn to_field_mask(field: FieldName) -> FieldMask { FieldMask::Prefix(FieldPath::from(Field::Name(field))) } + +#[cfg(test)] +mod test { + use std::collections::BTreeSet; + use std::ops::Range; + use std::sync::Arc; + use std::sync::atomic::AtomicUsize; + use std::sync::atomic::Ordering; + + use vortex_array::MaskFuture; + use vortex_array::expr::Expression; + use vortex_dtype::DType; + use vortex_dtype::FieldMask; + use vortex_dtype::Nullability; + use vortex_dtype::PType; + use vortex_error::VortexResult; + use vortex_io::runtime::BlockingRuntime; + use vortex_io::runtime::single::SingleThreadRuntime; + use vortex_io::session::RuntimeSessionExt; + use vortex_layout::ArrayFuture; + use vortex_layout::LayoutReader; + use vortex_mask::Mask; + + use super::ScanBuilder; + + #[derive(Debug)] + struct CountingLayoutReader { + name: Arc, + dtype: DType, + row_count: u64, + register_splits_calls: Arc, + } + + impl CountingLayoutReader { + fn new(register_splits_calls: Arc) -> Self { + Self { + name: Arc::from("counting"), + dtype: DType::Primitive(PType::I32, Nullability::NonNullable), + row_count: 1, + register_splits_calls, + } + } + } + + impl LayoutReader for CountingLayoutReader { + fn name(&self) -> &Arc { + &self.name + } + + fn dtype(&self) -> &DType { + &self.dtype + } + + fn row_count(&self) -> u64 { + self.row_count + } + + fn register_splits( + &self, + _field_mask: &[FieldMask], + row_range: &Range, + splits: &mut BTreeSet, + ) -> VortexResult<()> { + self.register_splits_calls.fetch_add(1, Ordering::Relaxed); + splits.insert(row_range.end); + Ok(()) + } + + fn pruning_evaluation( + &self, + _row_range: &Range, + _expr: &Expression, + _mask: Mask, + ) -> VortexResult { + unimplemented!("not needed for this test"); + } + + fn filter_evaluation( + &self, + _row_range: &Range, + _expr: &Expression, + _mask: MaskFuture, + ) -> VortexResult { + unimplemented!("not needed for this test"); + } + + fn projection_evaluation( + &self, + _row_range: &Range, + _expr: &Expression, + _mask: MaskFuture, + ) -> VortexResult { + Ok(Box::pin(async move { + unreachable!("scan should not be polled in this test") + })) + } + } + + #[test] + fn into_stream_is_lazy() { + let calls = Arc::new(AtomicUsize::new(0)); + let reader = Arc::new(CountingLayoutReader::new(calls.clone())); + + let runtime = SingleThreadRuntime::default(); + let session = crate::test::SESSION.clone().with_handle(runtime.handle()); + + let _stream = ScanBuilder::new(session, reader).into_stream().unwrap(); + + assert_eq!(calls.load(Ordering::Relaxed), 0); + } +}