Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion/catalog-listing/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ fn object_meta_to_partitioned_file(
) -> Result<Option<PartitionedFile>> {
Ok(Some(PartitionedFile {
object_meta,
arrow_schema: None,
partition_values: vec![],
range: None,
statistics: None,
Expand Down
54 changes: 53 additions & 1 deletion datafusion/datasource-parquet/src/opener/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -713,8 +713,11 @@ impl PreparedParquetOpen {
// unnecessary I/O. We decide later if it is needed to evaluate the
// pruning predicates. Thus default to not requesting it from the
// underlying reader.
let options =
let mut options =
ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Skip);
if let Some(schema) = self.partitioned_file.arrow_schema.as_ref() {
options = options.with_schema(Arc::clone(schema));
}
#[cfg(feature = "parquet_encryption")]
let mut options = options;
#[cfg(feature = "parquet_encryption")]
Expand Down Expand Up @@ -2168,6 +2171,55 @@ mod test {
assert_eq!(num_rows, 0);
}

#[tokio::test]
async fn test_opener_prioritizes_partitioned_file_schema() {
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;

let batch = record_batch!(
("a", Int32, vec![Some(1), Some(2), Some(2)]),
("b", Float32, vec![Some(1.0), Some(2.0), None])
)
.unwrap();
let data_size =
write_parquet(Arc::clone(&store), "test.parquet", batch.clone()).await;

let schema = batch.schema();
let query_file = async |schema: SchemaRef| -> Result<(usize, usize)> {
let file = PartitionedFile::new(
"test.parquet".to_string(),
u64::try_from(data_size).unwrap(),
)
.with_arrow_schema(schema.clone());

let predicate = logical2physical(&col("a").eq(lit(1)), &schema);
let opener = ParquetMorselizerBuilder::new()
.with_store(Arc::clone(&store))
.with_schema(Arc::clone(&schema))
.with_predicate(predicate)
.build();

let stream = open_file(&opener, file.clone()).await?;
Ok(count_batches_and_rows(stream).await)
};

let (num_batches, num_rows) =
query_file(schema.clone()).await.expect("query_file");
assert_eq!(num_batches, 1);
assert_eq!(num_rows, 3);

let mismatching_schema = Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Float64, true),
]);
assert_eq!(
query_file(SchemaRef::new(mismatching_schema))
.await
.unwrap_err()
.message(),
"Arrow: Incompatible supplied Arrow schema: data type mismatch for field b: requested Float64 but found Float32"
);
}

#[tokio::test]
async fn test_reverse_scan_row_groups() {
use parquet::file::properties::WriterProperties;
Expand Down
25 changes: 25 additions & 0 deletions datafusion/datasource/src/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ use object_store::{GetOptions, GetRange, ObjectStore};
use object_store::{ObjectMeta, path::Path};
pub use table_schema::{TableSchema, TableSchemaBuilder};
// Remove when add_row_stats is remove
use arrow::datatypes::SchemaRef;
#[expect(deprecated)]
pub use statistics::add_row_stats;
pub use statistics::compute_all_files_statistics;
Expand Down Expand Up @@ -163,12 +164,23 @@ pub struct PartitionedFile {
/// The estimated size of the parquet metadata, in bytes
pub metadata_size_hint: Option<usize>,
pub table_reference: Option<TableReference>,
/// A user-provided physical Arrow schema for this file.
///
/// This schema describes only the columns stored in the file. It must not
/// include partition columns; those are represented separately by
/// [`Self::partition_values`] and the scan's table partition columns.
///
/// When provided, this field will be used by the Parquet reader to avoid
/// parsing the Arrow schema from the `ARROW:schema` metadata key. Other
/// built-in file sources ignore it for now.
pub arrow_schema: Option<SchemaRef>,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs a follow-up before merge. PartitionedFile::arrow_schema introduces a new user-provided scan contract, but physical plan proto serialization currently appears to drop it. datafusion/proto/src/physical_plan/to_proto.rs builds protobuf::PartitionedFile without this field, and datafusion/proto/proto/datafusion.proto does not seem to have a schema field for it.

As a result, a Parquet scan that is serialized and deserialized would lose the explicit schema and fall back to parsing ARROW:schema, so the main guarantee from this change would not hold end to end.

Could you please add this field to the proto model and conversions, plus a roundtrip test showing that PartitionedFile::with_arrow_schema(...) survives physical plan or PartitionedFile proto serialization?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I updated the protos to serialize and deserialize the file arrow schema as well. There is a proto test now which verifies the round trip.

}

impl PartitionedFile {
/// Create a simple file without metadata or partition
pub fn new(path: impl Into<String>, size: u64) -> Self {
Self {
arrow_schema: None,
object_meta: ObjectMeta {
location: Path::from(path.into()),
last_modified: chrono::Utc.timestamp_nanos(0),
Expand All @@ -189,6 +201,7 @@ impl PartitionedFile {
/// Create a file from a known ObjectMeta without partition
pub fn new_from_meta(object_meta: ObjectMeta) -> Self {
Self {
arrow_schema: None,
object_meta,
partition_values: vec![],
range: None,
Expand All @@ -203,6 +216,7 @@ impl PartitionedFile {
/// Create a file range without metadata or partition
pub fn new_with_range(path: String, size: u64, start: i64, end: i64) -> Self {
Self {
arrow_schema: None,
object_meta: ObjectMeta {
location: Path::from(path),
last_modified: chrono::Utc.timestamp_nanos(0),
Expand All @@ -221,6 +235,15 @@ impl PartitionedFile {
.with_range(start, end)
}

/// Provide a physical Arrow schema for this file.
///
/// The schema must describe only columns stored in the file and must not
/// include partition columns. See [`Self::arrow_schema`] for details.
pub fn with_arrow_schema(mut self, schema: SchemaRef) -> Self {
self.arrow_schema = Some(schema);
self
}

/// Attach partition values to this file.
/// This replaces any existing partition values.
pub fn with_partition_values(mut self, partition_values: Vec<ScalarValue>) -> Self {
Expand Down Expand Up @@ -376,6 +399,7 @@ impl From<ObjectMeta> for PartitionedFile {
fn from(object_meta: ObjectMeta) -> Self {
PartitionedFile {
object_meta,
arrow_schema: None,
partition_values: vec![],
range: None,
statistics: None,
Expand Down Expand Up @@ -556,6 +580,7 @@ pub fn generate_test_files(num_files: usize, overlap_factor: f64) -> Vec<FileGro
let max = (base + range_size) as f64;

let file = PartitionedFile {
arrow_schema: None,
object_meta: ObjectMeta {
location: Path::from(format!("file_{i}.parquet")),
last_modified: chrono::Utc::now(),
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto-models/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1418,6 +1418,7 @@ message PartitionedFile {
repeated datafusion_common.ScalarValue partition_values = 4;
FileRange range = 5;
datafusion_common.Statistics statistics = 6;
datafusion_common.Schema arrow_schema = 7;
}

message FileRange {
Expand Down
18 changes: 18 additions & 0 deletions datafusion/proto-models/src/generated/pbjson.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15581,6 +15581,9 @@ impl serde::Serialize for PartitionedFile {
if self.statistics.is_some() {
len += 1;
}
if self.arrow_schema.is_some() {
len += 1;
}
let mut struct_ser = serializer.serialize_struct("datafusion.PartitionedFile", len)?;
if !self.path.is_empty() {
struct_ser.serialize_field("path", &self.path)?;
Expand All @@ -15604,6 +15607,9 @@ impl serde::Serialize for PartitionedFile {
if let Some(v) = self.statistics.as_ref() {
struct_ser.serialize_field("statistics", v)?;
}
if let Some(v) = self.arrow_schema.as_ref() {
struct_ser.serialize_field("arrowSchema", v)?;
}
struct_ser.end()
}
}
Expand All @@ -15622,6 +15628,8 @@ impl<'de> serde::Deserialize<'de> for PartitionedFile {
"partitionValues",
"range",
"statistics",
"arrow_schema",
"arrowSchema",
];

#[allow(clippy::enum_variant_names)]
Expand All @@ -15632,6 +15640,7 @@ impl<'de> serde::Deserialize<'de> for PartitionedFile {
PartitionValues,
Range,
Statistics,
ArrowSchema,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
Expand Down Expand Up @@ -15659,6 +15668,7 @@ impl<'de> serde::Deserialize<'de> for PartitionedFile {
"partitionValues" | "partition_values" => Ok(GeneratedField::PartitionValues),
"range" => Ok(GeneratedField::Range),
"statistics" => Ok(GeneratedField::Statistics),
"arrowSchema" | "arrow_schema" => Ok(GeneratedField::ArrowSchema),
_ => Err(serde::de::Error::unknown_field(value, FIELDS)),
}
}
Expand All @@ -15684,6 +15694,7 @@ impl<'de> serde::Deserialize<'de> for PartitionedFile {
let mut partition_values__ = None;
let mut range__ = None;
let mut statistics__ = None;
let mut arrow_schema__ = None;
while let Some(k) = map_.next_key()? {
match k {
GeneratedField::Path => {
Expand Down Expand Up @@ -15726,6 +15737,12 @@ impl<'de> serde::Deserialize<'de> for PartitionedFile {
}
statistics__ = map_.next_value()?;
}
GeneratedField::ArrowSchema => {
if arrow_schema__.is_some() {
return Err(serde::de::Error::duplicate_field("arrowSchema"));
}
arrow_schema__ = map_.next_value()?;
}
}
}
Ok(PartitionedFile {
Expand All @@ -15735,6 +15752,7 @@ impl<'de> serde::Deserialize<'de> for PartitionedFile {
partition_values: partition_values__.unwrap_or_default(),
range: range__,
statistics: statistics__,
arrow_schema: arrow_schema__,
})
}
}
Expand Down
2 changes: 2 additions & 0 deletions datafusion/proto-models/src/generated/prost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2106,6 +2106,8 @@ pub struct PartitionedFile {
pub range: ::core::option::Option<FileRange>,
#[prost(message, optional, tag = "6")]
pub statistics: ::core::option::Option<super::datafusion_common::Statistics>,
#[prost(message, optional, tag = "7")]
pub arrow_schema: ::core::option::Option<super::datafusion_common::Schema>,
}
#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
pub struct FileRange {
Expand Down
33 changes: 33 additions & 0 deletions datafusion/proto/src/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,11 @@ impl TryFromProto<&protobuf::PartitionedFile> for PartitionedFile {
.map(|v| v.try_into())
.collect::<Result<Vec<_>, _>>()?,
);
if let Some(proto_schema) = val.arrow_schema.as_ref() {
pf = pf.with_arrow_schema(Arc::new(
proto_schema.try_into().map_err(DataFusionError::from)?,
));
}
if let Some(range) = val.range.as_ref() {
let file_range = FileRange::try_from_proto(range)?;
pf = pf.with_range(file_range.start, file_range.end);
Expand Down Expand Up @@ -896,9 +901,37 @@ mod tests {
assert_eq!(pf2.object_meta.last_modified, pf.object_meta.last_modified);
}

#[test]
fn partitioned_file_arrow_schema_roundtrip() {
use arrow::datatypes::{DataType, Field, Schema};
use std::collections::HashMap;

let arrow_schema = Arc::new(Schema::new_with_metadata(
vec![
Field::new("id", DataType::Int64, false),
Field::new("value", DataType::Utf8, true).with_metadata(HashMap::from([
("field_meta".to_string(), "field_value".to_string()),
])),
],
HashMap::from([("schema_meta".to_string(), "schema_value".to_string())]),
));
let pf = PartitionedFile::new("foo/bar.parquet", 10)
.with_arrow_schema(Arc::clone(&arrow_schema));

let proto = protobuf::PartitionedFile::try_from_proto(&pf).unwrap();
assert!(proto.arrow_schema.is_some());

let decoded = PartitionedFile::try_from_proto(&proto).unwrap();
assert_eq!(
decoded.arrow_schema.as_ref().map(|s| s.as_ref()),
Some(arrow_schema.as_ref())
);
}

#[test]
fn partitioned_file_from_proto_invalid_path() {
let proto = protobuf::PartitionedFile {
arrow_schema: None,
path: "foo//bar".to_string(),
size: 1,
last_modified_ns: 0,
Expand Down
5 changes: 5 additions & 0 deletions datafusion/proto/src/physical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,11 @@ impl TryFromProto<&PartitionedFile> for protobuf::PartitionedFile {
))
})? as u64;
Ok(protobuf::PartitionedFile {
arrow_schema: pf
.arrow_schema
.as_ref()
.map(|s| s.as_ref().try_into())
.transpose()?,
path: pf.object_meta.location.as_ref().to_owned(),
size: pf.object_meta.size,
last_modified_ns,
Expand Down
Loading