diff --git a/crates/iceberg/src/arrow/caching_delete_file_loader.rs b/crates/iceberg/src/arrow/caching_delete_file_loader.rs index 192ca390a8..250fc5e8d9 100644 --- a/crates/iceberg/src/arrow/caching_delete_file_loader.rs +++ b/crates/iceberg/src/arrow/caching_delete_file_loader.rs @@ -330,7 +330,7 @@ impl CachingDeleteFileLoader { mut stream: ArrowRecordBatchStream, equality_ids: HashSet, ) -> Result { - let mut result_predicate = AlwaysTrue; + let mut row_predicates = Vec::new(); let mut batch_schema_iceberg: Option = None; let accessor = EqDelRecordBatchPartnerAccessor; @@ -374,10 +374,29 @@ impl CachingDeleteFileLoader { row_predicate = row_predicate.and(cell_predicate) } } - result_predicate = result_predicate.and(row_predicate.not()); + row_predicates.push(row_predicate.not().rewrite_not()); } } - Ok(result_predicate.rewrite_not()) + + // All row predicates are combined to a single predicate by creating a balanced binary tree. + // Using a simple fold would result in a deeply nested predicate that can cause a stack overflow. + while row_predicates.len() > 1 { + let mut next_level = Vec::with_capacity(row_predicates.len().div_ceil(2)); + let mut iter = row_predicates.into_iter(); + while let Some(p1) = iter.next() { + if let Some(p2) = iter.next() { + next_level.push(p1.and(p2)); + } else { + next_level.push(p1); + } + } + row_predicates = next_level; + } + + match row_predicates.pop() { + Some(p) => Ok(p), + None => Ok(AlwaysTrue), + } } } @@ -912,4 +931,51 @@ mod tests { result.err() ); } + + #[tokio::test] + async fn test_large_equality_delete_batch_stack_overflow() { + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().as_os_str().to_str().unwrap(); + let file_io = FileIO::from_path(table_location).unwrap().build().unwrap(); + + // Create a large batch of equality deletes + let num_rows = 20_000; + let col_y_vals: Vec = (0..num_rows).collect(); + let col_y = Arc::new(Int64Array::from(col_y_vals)) as ArrayRef; + + let schema = Arc::new(arrow_schema::Schema::new(vec![ + Field::new("y", arrow_schema::DataType::Int64, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "2".to_string(), + )])), + ])); + + let record_batch = RecordBatch::try_new(schema.clone(), vec![col_y]).unwrap(); + + // Write to file + let path = format!("{}/large-eq-deletes.parquet", &table_location); + let file = File::create(&path).unwrap(); + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + let mut writer = ArrowWriter::try_new(file, schema, Some(props)).unwrap(); + writer.write(&record_batch).unwrap(); + writer.close().unwrap(); + + let basic_delete_file_loader = BasicDeleteFileLoader::new(file_io.clone()); + let record_batch_stream = basic_delete_file_loader + .parquet_to_batch_stream(&path) + .await + .expect("could not get batch stream"); + + let eq_ids = HashSet::from_iter(vec![2]); + + let result = CachingDeleteFileLoader::parse_equality_deletes_record_batch_stream( + record_batch_stream, + eq_ids, + ) + .await; + + assert!(result.is_ok()); + } }