@@ -330,7 +330,7 @@ impl CachingDeleteFileLoader {
330330 mut stream : ArrowRecordBatchStream ,
331331 equality_ids : HashSet < i32 > ,
332332 ) -> Result < Predicate > {
333- let mut result_predicate = AlwaysTrue ;
333+ let mut row_predicates = Vec :: new ( ) ;
334334 let mut batch_schema_iceberg: Option < Schema > = None ;
335335 let accessor = EqDelRecordBatchPartnerAccessor ;
336336
@@ -374,10 +374,27 @@ impl CachingDeleteFileLoader {
374374 row_predicate = row_predicate. and ( cell_predicate)
375375 }
376376 }
377- result_predicate = result_predicate . and ( row_predicate. not ( ) ) ;
377+ row_predicates . push ( row_predicate. not ( ) . rewrite_not ( ) ) ;
378378 }
379379 }
380- Ok ( result_predicate. rewrite_not ( ) )
380+
381+ while row_predicates. len ( ) > 1 {
382+ let mut next_level = Vec :: with_capacity ( row_predicates. len ( ) . div_ceil ( 2 ) ) ;
383+ let mut iter = row_predicates. into_iter ( ) ;
384+ while let Some ( p1) = iter. next ( ) {
385+ if let Some ( p2) = iter. next ( ) {
386+ next_level. push ( p1. and ( p2) ) ;
387+ } else {
388+ next_level. push ( p1) ;
389+ }
390+ }
391+ row_predicates = next_level;
392+ }
393+
394+ match row_predicates. pop ( ) {
395+ Some ( p) => Ok ( p) ,
396+ None => Ok ( AlwaysTrue ) ,
397+ }
381398 }
382399}
383400
@@ -912,4 +929,51 @@ mod tests {
912929 result. err( )
913930 ) ;
914931 }
932+
933+ #[ tokio:: test]
934+ async fn test_large_equality_delete_batch_stack_overflow ( ) {
935+ let tmp_dir = TempDir :: new ( ) . unwrap ( ) ;
936+ let table_location = tmp_dir. path ( ) . as_os_str ( ) . to_str ( ) . unwrap ( ) ;
937+ let file_io = FileIO :: from_path ( table_location) . unwrap ( ) . build ( ) . unwrap ( ) ;
938+
939+ // Create a large batch of equality deletes
940+ let num_rows = 20_000 ;
941+ let col_y_vals: Vec < i64 > = ( 0 ..num_rows) . collect ( ) ;
942+ let col_y = Arc :: new ( Int64Array :: from ( col_y_vals) ) as ArrayRef ;
943+
944+ let schema = Arc :: new ( arrow_schema:: Schema :: new ( vec ! [
945+ Field :: new( "y" , arrow_schema:: DataType :: Int64 , false ) . with_metadata( HashMap :: from( [ (
946+ PARQUET_FIELD_ID_META_KEY . to_string( ) ,
947+ "2" . to_string( ) ,
948+ ) ] ) ) ,
949+ ] ) ) ;
950+
951+ let record_batch = RecordBatch :: try_new ( schema. clone ( ) , vec ! [ col_y] ) . unwrap ( ) ;
952+
953+ // Write to file
954+ let path = format ! ( "{}/large-eq-deletes.parquet" , & table_location) ;
955+ let file = File :: create ( & path) . unwrap ( ) ;
956+ let props = WriterProperties :: builder ( )
957+ . set_compression ( Compression :: SNAPPY )
958+ . build ( ) ;
959+ let mut writer = ArrowWriter :: try_new ( file, schema, Some ( props) ) . unwrap ( ) ;
960+ writer. write ( & record_batch) . unwrap ( ) ;
961+ writer. close ( ) . unwrap ( ) ;
962+
963+ let basic_delete_file_loader = BasicDeleteFileLoader :: new ( file_io. clone ( ) ) ;
964+ let record_batch_stream = basic_delete_file_loader
965+ . parquet_to_batch_stream ( & path)
966+ . await
967+ . expect ( "could not get batch stream" ) ;
968+
969+ let eq_ids = HashSet :: from_iter ( vec ! [ 2 ] ) ;
970+
971+ let result = CachingDeleteFileLoader :: parse_equality_deletes_record_batch_stream (
972+ record_batch_stream,
973+ eq_ids,
974+ )
975+ . await ;
976+
977+ assert ! ( result. is_ok( ) ) ;
978+ }
915979}
0 commit comments