From cf490a4b57ac6619e071dd18e9f4e8ae9f2b7bc7 Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Tue, 19 May 2026 12:09:56 +0200 Subject: [PATCH 1/7] Allow specifying an arrow schema for PartitionedFile --- datafusion/catalog-listing/src/helpers.rs | 1 + .../datasource-parquet/src/opener/mod.rs | 63 +++++++++++++++++-- datafusion/datasource/src/mod.rs | 14 +++++ 3 files changed, 74 insertions(+), 4 deletions(-) diff --git a/datafusion/catalog-listing/src/helpers.rs b/datafusion/catalog-listing/src/helpers.rs index 0389b3cb17fe9..4f83ec4b3730f 100644 --- a/datafusion/catalog-listing/src/helpers.rs +++ b/datafusion/catalog-listing/src/helpers.rs @@ -459,6 +459,7 @@ fn object_meta_to_partitioned_file( ) -> Result> { Ok(Some(PartitionedFile { object_meta, + arrow_schema: None, partition_values: vec![], range: None, statistics: None, diff --git a/datafusion/datasource-parquet/src/opener/mod.rs b/datafusion/datasource-parquet/src/opener/mod.rs index 09e77638776e5..e6b44f63a45f0 100644 --- a/datafusion/datasource-parquet/src/opener/mod.rs +++ b/datafusion/datasource-parquet/src/opener/mod.rs @@ -713,8 +713,14 @@ impl PreparedParquetOpen { // unnecessary I/O. We decide later if it is needed to evaluate the // pruning predicates. Thus default to not requesting it from the // underlying reader. - let options = - ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Skip); + let options = { + let mut options = + ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Skip); + if let Some(schema) = self.partitioned_file.arrow_schema.as_ref() { + options = options.with_schema(schema.to_owned()); + } + options + }; #[cfg(feature = "parquet_encryption")] let mut options = options; #[cfg(feature = "parquet_encryption")] @@ -1411,8 +1417,8 @@ mod test { use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use bytes::{BufMut, BytesMut}; use datafusion_common::{ - ColumnStatistics, ScalarValue, Statistics, internal_err, record_batch, - stats::Precision, + ColumnStatistics, DataFusionError, ScalarValue, Statistics, internal_err, + record_batch, stats::Precision, }; use datafusion_datasource::morsel::{Morsel, Morselizer}; use datafusion_datasource::{PartitionedFile, TableSchema, TableSchemaBuilder}; @@ -2168,6 +2174,55 @@ mod test { assert_eq!(num_rows, 0); } + #[tokio::test] + async fn test_opener_prioritizes_partitioned_file_schema() { + let store = Arc::new(InMemory::new()) as Arc; + + let batch = record_batch!( + ("a", Int32, vec![Some(1), Some(2), Some(2)]), + ("b", Float32, vec![Some(1.0), Some(2.0), None]) + ) + .unwrap(); + let data_size = + write_parquet(Arc::clone(&store), "test.parquet", batch.clone()).await; + + let schema = batch.schema(); + let query_file = async |schema: SchemaRef| -> Result<(usize, usize)> { + let file = PartitionedFile::new( + "test.parquet".to_string(), + u64::try_from(data_size).unwrap(), + ) + .with_arrow_schema(schema.clone()); + + let predicate = logical2physical(&col("a").eq(lit(1)), &schema); + let opener = ParquetMorselizerBuilder::new() + .with_store(Arc::clone(&store)) + .with_schema(Arc::clone(&schema)) + .with_predicate(predicate) + .build(); + + let stream = open_file(&opener, file.clone()).await?; + Ok(count_batches_and_rows(stream).await) + }; + + let (num_batches, num_rows) = + query_file(schema.clone()).await.expect("query_file"); + assert_eq!(num_batches, 1); + assert_eq!(num_rows, 3); + + let mismatching_schema = Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Float64, true), + ]); + assert_eq!( + query_file(SchemaRef::new(mismatching_schema)) + .await + .unwrap_err() + .message(), + "Arrow: Incompatible supplied Arrow schema: data type mismatch for field b: requested Float64 but found Float32" + ); + } + #[tokio::test] async fn test_reverse_scan_row_groups() { use parquet::file::properties::WriterProperties; diff --git a/datafusion/datasource/src/mod.rs b/datafusion/datasource/src/mod.rs index b92b4b454676f..9efa66baa658b 100644 --- a/datafusion/datasource/src/mod.rs +++ b/datafusion/datasource/src/mod.rs @@ -71,6 +71,7 @@ use std::any::Any; use std::ops::Range; use std::pin::Pin; use std::sync::Arc; +use arrow::datatypes::SchemaRef; /// User-defined per-file extension data, keyed by concrete Rust type. /// @@ -163,12 +164,15 @@ pub struct PartitionedFile { /// The estimated size of the parquet metadata, in bytes pub metadata_size_hint: Option, pub table_reference: Option, + /// A user-provided arrow schema for the file. + pub arrow_schema: Option, } impl PartitionedFile { /// Create a simple file without metadata or partition pub fn new(path: impl Into, size: u64) -> Self { Self { + arrow_schema: None, object_meta: ObjectMeta { location: Path::from(path.into()), last_modified: chrono::Utc.timestamp_nanos(0), @@ -189,6 +193,7 @@ impl PartitionedFile { /// Create a file from a known ObjectMeta without partition pub fn new_from_meta(object_meta: ObjectMeta) -> Self { Self { + arrow_schema: None, object_meta, partition_values: vec![], range: None, @@ -203,6 +208,7 @@ impl PartitionedFile { /// Create a file range without metadata or partition pub fn new_with_range(path: String, size: u64, start: i64, end: i64) -> Self { Self { + arrow_schema: None, object_meta: ObjectMeta { location: Path::from(path), last_modified: chrono::Utc.timestamp_nanos(0), @@ -221,6 +227,12 @@ impl PartitionedFile { .with_range(start, end) } + /// Provide an arrow schema for the file. + pub fn with_arrow_schema(mut self, schema: SchemaRef) -> Self { + self.arrow_schema = Some(schema); + self + } + /// Attach partition values to this file. /// This replaces any existing partition values. pub fn with_partition_values(mut self, partition_values: Vec) -> Self { @@ -376,6 +388,7 @@ impl From for PartitionedFile { fn from(object_meta: ObjectMeta) -> Self { PartitionedFile { object_meta, + arrow_schema: None, partition_values: vec![], range: None, statistics: None, @@ -556,6 +569,7 @@ pub fn generate_test_files(num_files: usize, overlap_factor: f64) -> Vec Date: Tue, 19 May 2026 13:27:56 +0200 Subject: [PATCH 2/7] Fix cargo fmt --- datafusion/datasource/src/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/datasource/src/mod.rs b/datafusion/datasource/src/mod.rs index 9efa66baa658b..f8a2e33ad5f41 100644 --- a/datafusion/datasource/src/mod.rs +++ b/datafusion/datasource/src/mod.rs @@ -64,6 +64,7 @@ use object_store::{GetOptions, GetRange, ObjectStore}; use object_store::{ObjectMeta, path::Path}; pub use table_schema::{TableSchema, TableSchemaBuilder}; // Remove when add_row_stats is remove +use arrow::datatypes::SchemaRef; #[expect(deprecated)] pub use statistics::add_row_stats; pub use statistics::compute_all_files_statistics; @@ -71,7 +72,6 @@ use std::any::Any; use std::ops::Range; use std::pin::Pin; use std::sync::Arc; -use arrow::datatypes::SchemaRef; /// User-defined per-file extension data, keyed by concrete Rust type. /// From 021b9bbfe8990cb588774158060ad80defd5c7f9 Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Tue, 19 May 2026 16:26:48 +0200 Subject: [PATCH 3/7] Fix clippy --- datafusion/datasource-parquet/src/opener/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/datasource-parquet/src/opener/mod.rs b/datafusion/datasource-parquet/src/opener/mod.rs index e6b44f63a45f0..fb36fa4f2b8c7 100644 --- a/datafusion/datasource-parquet/src/opener/mod.rs +++ b/datafusion/datasource-parquet/src/opener/mod.rs @@ -1417,7 +1417,7 @@ mod test { use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use bytes::{BufMut, BytesMut}; use datafusion_common::{ - ColumnStatistics, DataFusionError, ScalarValue, Statistics, internal_err, + ColumnStatistics, ScalarValue, Statistics, internal_err, record_batch, stats::Precision, }; use datafusion_datasource::morsel::{Morsel, Morselizer}; From 9be11f3966f38dbdcd7f900be445ef829f0b875a Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Tue, 26 May 2026 12:03:05 +0200 Subject: [PATCH 4/7] Fix cargo fmt --- datafusion/datasource-parquet/src/opener/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener/mod.rs b/datafusion/datasource-parquet/src/opener/mod.rs index fb36fa4f2b8c7..db83c4db37001 100644 --- a/datafusion/datasource-parquet/src/opener/mod.rs +++ b/datafusion/datasource-parquet/src/opener/mod.rs @@ -1417,8 +1417,8 @@ mod test { use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use bytes::{BufMut, BytesMut}; use datafusion_common::{ - ColumnStatistics, ScalarValue, Statistics, internal_err, - record_batch, stats::Precision, + ColumnStatistics, ScalarValue, Statistics, internal_err, record_batch, + stats::Precision, }; use datafusion_datasource::morsel::{Morsel, Morselizer}; use datafusion_datasource::{PartitionedFile, TableSchema, TableSchemaBuilder}; From d011c64f55e9a698a6774d8cf491b522c461b303 Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Tue, 26 May 2026 14:14:50 +0200 Subject: [PATCH 5/7] Integrate arrow schema in proto --- datafusion/datasource/src/mod.rs | 15 +++++++-- .../proto-models/proto/datafusion.proto | 1 + .../proto-models/src/generated/pbjson.rs | 18 ++++++++++ .../proto-models/src/generated/prost.rs | 2 ++ .../proto/src/physical_plan/from_proto.rs | 33 +++++++++++++++++++ .../proto/src/physical_plan/to_proto.rs | 5 +++ 6 files changed, 72 insertions(+), 2 deletions(-) diff --git a/datafusion/datasource/src/mod.rs b/datafusion/datasource/src/mod.rs index f8a2e33ad5f41..82030e545a42e 100644 --- a/datafusion/datasource/src/mod.rs +++ b/datafusion/datasource/src/mod.rs @@ -164,7 +164,15 @@ pub struct PartitionedFile { /// The estimated size of the parquet metadata, in bytes pub metadata_size_hint: Option, pub table_reference: Option, - /// A user-provided arrow schema for the file. + /// A user-provided physical Arrow schema for this file. + /// + /// This schema describes only the columns stored in the file. It must not + /// include partition columns; those are represented separately by + /// [`Self::partition_values`] and the scan's table partition columns. + /// + /// When provided, this field will be used by the Parquet reader to avoid + /// parsing the Arrow schema from the `ARROW:schema` metadata key. Other + /// built-in file sources ignore it for now. pub arrow_schema: Option, } @@ -227,7 +235,10 @@ impl PartitionedFile { .with_range(start, end) } - /// Provide an arrow schema for the file. + /// Provide a physical Arrow schema for this file. + /// + /// The schema must describe only columns stored in the file and must not + /// include partition columns. See [`Self::arrow_schema`] for details. pub fn with_arrow_schema(mut self, schema: SchemaRef) -> Self { self.arrow_schema = Some(schema); self diff --git a/datafusion/proto-models/proto/datafusion.proto b/datafusion/proto-models/proto/datafusion.proto index ea6d078366625..9b066c62af7ef 100644 --- a/datafusion/proto-models/proto/datafusion.proto +++ b/datafusion/proto-models/proto/datafusion.proto @@ -1418,6 +1418,7 @@ message PartitionedFile { repeated datafusion_common.ScalarValue partition_values = 4; FileRange range = 5; datafusion_common.Statistics statistics = 6; + datafusion_common.Schema arrow_schema = 7; } message FileRange { diff --git a/datafusion/proto-models/src/generated/pbjson.rs b/datafusion/proto-models/src/generated/pbjson.rs index 8e6997757f110..a15840a7e93b1 100644 --- a/datafusion/proto-models/src/generated/pbjson.rs +++ b/datafusion/proto-models/src/generated/pbjson.rs @@ -15581,6 +15581,9 @@ impl serde::Serialize for PartitionedFile { if self.statistics.is_some() { len += 1; } + if self.arrow_schema.is_some() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion.PartitionedFile", len)?; if !self.path.is_empty() { struct_ser.serialize_field("path", &self.path)?; @@ -15604,6 +15607,9 @@ impl serde::Serialize for PartitionedFile { if let Some(v) = self.statistics.as_ref() { struct_ser.serialize_field("statistics", v)?; } + if let Some(v) = self.arrow_schema.as_ref() { + struct_ser.serialize_field("arrowSchema", v)?; + } struct_ser.end() } } @@ -15622,6 +15628,8 @@ impl<'de> serde::Deserialize<'de> for PartitionedFile { "partitionValues", "range", "statistics", + "arrow_schema", + "arrowSchema", ]; #[allow(clippy::enum_variant_names)] @@ -15632,6 +15640,7 @@ impl<'de> serde::Deserialize<'de> for PartitionedFile { PartitionValues, Range, Statistics, + ArrowSchema, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -15659,6 +15668,7 @@ impl<'de> serde::Deserialize<'de> for PartitionedFile { "partitionValues" | "partition_values" => Ok(GeneratedField::PartitionValues), "range" => Ok(GeneratedField::Range), "statistics" => Ok(GeneratedField::Statistics), + "arrowSchema" | "arrow_schema" => Ok(GeneratedField::ArrowSchema), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -15684,6 +15694,7 @@ impl<'de> serde::Deserialize<'de> for PartitionedFile { let mut partition_values__ = None; let mut range__ = None; let mut statistics__ = None; + let mut arrow_schema__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::Path => { @@ -15726,6 +15737,12 @@ impl<'de> serde::Deserialize<'de> for PartitionedFile { } statistics__ = map_.next_value()?; } + GeneratedField::ArrowSchema => { + if arrow_schema__.is_some() { + return Err(serde::de::Error::duplicate_field("arrowSchema")); + } + arrow_schema__ = map_.next_value()?; + } } } Ok(PartitionedFile { @@ -15735,6 +15752,7 @@ impl<'de> serde::Deserialize<'de> for PartitionedFile { partition_values: partition_values__.unwrap_or_default(), range: range__, statistics: statistics__, + arrow_schema: arrow_schema__, }) } } diff --git a/datafusion/proto-models/src/generated/prost.rs b/datafusion/proto-models/src/generated/prost.rs index d8187e65a501e..9a38756fef5e6 100644 --- a/datafusion/proto-models/src/generated/prost.rs +++ b/datafusion/proto-models/src/generated/prost.rs @@ -2106,6 +2106,8 @@ pub struct PartitionedFile { pub range: ::core::option::Option, #[prost(message, optional, tag = "6")] pub statistics: ::core::option::Option, + #[prost(message, optional, tag = "7")] + pub arrow_schema: ::core::option::Option, } #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] pub struct FileRange { diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 55022608e5a70..56a9b52906114 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -720,6 +720,11 @@ impl TryFromProto<&protobuf::PartitionedFile> for PartitionedFile { .map(|v| v.try_into()) .collect::, _>>()?, ); + if let Some(proto_schema) = val.arrow_schema.as_ref() { + pf = pf.with_arrow_schema(Arc::new( + proto_schema.try_into().map_err(DataFusionError::from)?, + )); + } if let Some(range) = val.range.as_ref() { let file_range = FileRange::try_from_proto(range)?; pf = pf.with_range(file_range.start, file_range.end); @@ -896,9 +901,37 @@ mod tests { assert_eq!(pf2.object_meta.last_modified, pf.object_meta.last_modified); } + #[test] + fn partitioned_file_arrow_schema_roundtrip() { + use arrow::datatypes::{DataType, Field, Schema}; + use std::collections::HashMap; + + let arrow_schema = Arc::new(Schema::new_with_metadata( + vec![ + Field::new("id", DataType::Int64, false), + Field::new("value", DataType::Utf8, true).with_metadata(HashMap::from([ + ("field_meta".to_string(), "field_value".to_string()), + ])), + ], + HashMap::from([("schema_meta".to_string(), "schema_value".to_string())]), + )); + let pf = PartitionedFile::new("foo/bar.parquet", 10) + .with_arrow_schema(Arc::clone(&arrow_schema)); + + let proto = protobuf::PartitionedFile::try_from(&pf).unwrap(); + assert!(proto.arrow_schema.is_some()); + + let decoded = PartitionedFile::try_from(&proto).unwrap(); + assert_eq!( + decoded.arrow_schema.as_ref().map(|s| s.as_ref()), + Some(arrow_schema.as_ref()) + ); + } + #[test] fn partitioned_file_from_proto_invalid_path() { let proto = protobuf::PartitionedFile { + arrow_schema: None, path: "foo//bar".to_string(), size: 1, last_modified_ns: 0, diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index c359f651c0e11..5ae4668c782df 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -585,6 +585,11 @@ impl TryFromProto<&PartitionedFile> for protobuf::PartitionedFile { )) })? as u64; Ok(protobuf::PartitionedFile { + arrow_schema: pf + .arrow_schema + .as_ref() + .map(|s| s.as_ref().try_into()) + .transpose()?, path: pf.object_meta.location.as_ref().to_owned(), size: pf.object_meta.size, last_modified_ns, From 2c719c854dd732be7f814ac17cc1f0d1129fcf5d Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Wed, 27 May 2026 15:06:45 +0200 Subject: [PATCH 6/7] Use TryFromProto --- datafusion/proto/src/physical_plan/from_proto.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 56a9b52906114..b497bb339de43 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -918,10 +918,10 @@ mod tests { let pf = PartitionedFile::new("foo/bar.parquet", 10) .with_arrow_schema(Arc::clone(&arrow_schema)); - let proto = protobuf::PartitionedFile::try_from(&pf).unwrap(); + let proto = protobuf::PartitionedFile::try_from_proto(&pf).unwrap(); assert!(proto.arrow_schema.is_some()); - let decoded = PartitionedFile::try_from(&proto).unwrap(); + let decoded = PartitionedFile::try_from_proto(&proto).unwrap(); assert_eq!( decoded.arrow_schema.as_ref().map(|s| s.as_ref()), Some(arrow_schema.as_ref()) From 451c718aa5d51d0c9060aa2b0610ada9d37ed730 Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Fri, 29 May 2026 11:08:27 +0200 Subject: [PATCH 7/7] Simplify options init --- datafusion/datasource-parquet/src/opener/mod.rs | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener/mod.rs b/datafusion/datasource-parquet/src/opener/mod.rs index db83c4db37001..6ef4cddd868bb 100644 --- a/datafusion/datasource-parquet/src/opener/mod.rs +++ b/datafusion/datasource-parquet/src/opener/mod.rs @@ -713,14 +713,11 @@ impl PreparedParquetOpen { // unnecessary I/O. We decide later if it is needed to evaluate the // pruning predicates. Thus default to not requesting it from the // underlying reader. - let options = { - let mut options = - ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Skip); - if let Some(schema) = self.partitioned_file.arrow_schema.as_ref() { - options = options.with_schema(schema.to_owned()); - } - options - }; + let mut options = + ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Skip); + if let Some(schema) = self.partitioned_file.arrow_schema.as_ref() { + options = options.with_schema(Arc::clone(schema)); + } #[cfg(feature = "parquet_encryption")] let mut options = options; #[cfg(feature = "parquet_encryption")]