diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 9096e5c366dac..cbfa971055f5e 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -639,6 +639,19 @@ config_namespace! { /// Should DataFusion keep the columns used for partition_by in the output RecordBatches pub keep_partition_by_columns: bool, default = false + /// When `true` (the default), DataFusion's built-in file scans + /// dynamically rebalance files across partitions at query execution + /// time: a partition that goes idle reads files (or byte-range morsels) + /// originally assigned to a sibling partition, which keeps all + /// partitions busy in a single process. + /// + /// Executors that depend on the plan-time partition assignment — such as + /// Ballista and datafusion-distributed, which run each partition as an + /// isolated task and never poll the siblings — should set this to + /// `false` so each partition reads only its own file group and no + /// runtime reassignment occurs. + pub enable_file_stream_work_stealing: bool, default = true + /// Aggregation ratio (number of distinct groups / number of input rows) /// threshold for skipping partial aggregation. If the value is greater /// then partial aggregation will skip aggregation for further input diff --git a/datafusion/datasource/src/file_scan_config/mod.rs b/datafusion/datasource/src/file_scan_config/mod.rs index 4bf86e17d387d..050cd54bb0adf 100644 --- a/datafusion/datasource/src/file_scan_config/mod.rs +++ b/datafusion/datasource/src/file_scan_config/mod.rs @@ -159,6 +159,12 @@ pub struct FileScanConfig { /// DataFusion may attempt to read each partition of files /// concurrently, however files *within* a partition will be read /// sequentially, one after the next. + /// + /// Note that when `datafusion.execution.enable_file_stream_work_stealing` + /// is enabled (the default), files may be reassigned to a different + /// partition at runtime unless `preserve_order` or + /// `partitioned_by_file_group` is set, so a file is not guaranteed to be + /// read by the partition it is grouped under here. pub file_groups: Vec, /// Table constraints pub constraints: Constraints, @@ -1038,10 +1044,18 @@ impl DataSource for FileScanConfig { /// during one execution. /// /// This returns `None` when sibling streams must not share work, such as - /// when file order must be preserved or the file groups define the output - /// partitioning needed for the rest of the plan - fn create_sibling_state(&self) -> Option> { - if self.preserve_order || self.partitioned_by_file_group { + /// when file order must be preserved, the file groups define the output + /// partitioning needed for the rest of the plan, or work stealing is + /// disabled via + /// `datafusion.execution.enable_file_stream_work_stealing`. + fn create_sibling_state( + &self, + config: &ConfigOptions, + ) -> Option> { + if self.preserve_order + || self.partitioned_by_file_group + || !config.execution.enable_file_stream_work_stealing + { return None; } diff --git a/datafusion/datasource/src/file_stream/mod.rs b/datafusion/datasource/src/file_stream/mod.rs index e277690cff810..9ea9a88e393c4 100644 --- a/datafusion/datasource/src/file_stream/mod.rs +++ b/datafusion/datasource/src/file_stream/mod.rs @@ -182,6 +182,7 @@ mod tests { use arrow::array::{AsArray, RecordBatch}; use arrow::datatypes::{DataType, Field, Int32Type, Schema}; use datafusion_common::DataFusionError; + use datafusion_common::config::ConfigOptions; use datafusion_common::error::Result; use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; @@ -1131,6 +1132,40 @@ mod tests { Ok(()) } + /// Verifies that disabling `enable_file_stream_work_stealing` keeps each + /// stream's files local, so a sibling cannot steal them at runtime. + /// + /// Covers : executors + /// that run each output partition as an isolated task in a separate process + /// (Ballista, datafusion-distributed) poll only their own partition, so the + /// shared work queue would let that one partition drain files belonging to + /// its siblings. Disabling the flag falls back to per-partition file groups. + #[tokio::test] + async fn morsel_disabled_work_stealing_keeps_files_local() -> Result<()> { + // same fixture as `morsel_shared_files_can_be_stolen`, but with work + // stealing disabled via config + let test = two_partition_morsel_test() + .with_enable_file_stream_work_stealing(false) + .with_file_stream_events(false); + + // Even though Partition 1 is polled first, it cannot steal the three + // files assigned to Partition 0; each partition reads only its own. + insta::assert_snapshot!(test.run().await.unwrap(), @r" + ----- Partition 0 ----- + Batch: 101 + Batch: 102 + Batch: 103 + Done + ----- Partition 1 ----- + Batch: 201 + Done + ----- File Stream Events ----- + (omitted due to with_file_stream_events(false)) + "); + + Ok(()) + } + /// Verifies that an empty sibling can immediately steal shared files when /// it is polled before the stream that originally owned them. #[tokio::test] @@ -1216,7 +1251,7 @@ mod tests { let unlimited_config = test.test_config(); let limited_config = test.clone().with_limit(1).test_config(); let shared_work_source = limited_config - .create_sibling_state() + .create_sibling_state(&ConfigOptions::default()) .and_then(|state| state.as_ref().downcast_ref::().cloned()) .expect("shared work source"); let limited_metrics = ExecutionPlanMetricsSet::new(); @@ -1332,6 +1367,7 @@ mod tests { partition_files: BTreeMap>, preserve_order: bool, partitioned_by_file_group: bool, + enable_file_stream_work_stealing: bool, file_stream_events: bool, build_streams_on_first_read: bool, reads: Vec, @@ -1346,6 +1382,7 @@ mod tests { partition_files: BTreeMap::new(), preserve_order: false, partitioned_by_file_group: false, + enable_file_stream_work_stealing: true, file_stream_events: true, build_streams_on_first_read: false, reads: vec![], @@ -1391,6 +1428,14 @@ mod tests { self } + /// Sets `datafusion.execution.enable_file_stream_work_stealing`. When + /// disabled, each stream keeps its own files local instead of sharing a + /// work queue with its siblings. + fn with_enable_file_stream_work_stealing(mut self, enable: bool) -> Self { + self.enable_file_stream_work_stealing = enable; + self + } + /// Controls whether scheduler events are included in the snapshot. /// /// When disabled, `run()` still includes the event section header but @@ -1468,9 +1513,13 @@ mod tests { // `FileStream`s directly, bypassing `DataSourceExec`, so they must // perform the same setup explicitly when exercising sibling-stream // work stealing. - let shared_work_source = config.create_sibling_state().and_then(|state| { - state.as_ref().downcast_ref::().cloned() - }); + let mut options = ConfigOptions::default(); + options.execution.enable_file_stream_work_stealing = + self.enable_file_stream_work_stealing; + let shared_work_source = + config.create_sibling_state(&options).and_then(|state| { + state.as_ref().downcast_ref::().cloned() + }); if !self.build_streams_on_first_read { for partition in build_order { let stream = FileStreamBuilder::new(&config) diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index af4bc09504937..6bb574fcadff3 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -243,9 +243,16 @@ pub trait DataSource: Any + Send + Sync + Debug { /// Create per execution state to share across sibling instances of this /// data source during one execution. /// + /// `config` is the session configuration, so implementations can honor + /// options that disable sibling sharing (returning `None`) for consumers + /// that cannot poll all partitions in one process. + /// /// Returns `None` (the default) if this data source has /// no sibling-shared execution state. - fn create_sibling_state(&self) -> Option> { + fn create_sibling_state( + &self, + _config: &ConfigOptions, + ) -> Option> { None } @@ -391,7 +398,10 @@ impl ExecutionPlan for DataSourceExec { ) -> Result { let shared_state = self .execution_state - .get_or_init(|| self.data_source.create_sibling_state()) + .get_or_init(|| { + self.data_source + .create_sibling_state(context.session_config().options()) + }) .clone(); let args = OpenArgs::new(partition, Arc::clone(&context)) .with_shared_state(shared_state); diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 37683b8e3095a..6f33483031ace 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -218,6 +218,7 @@ datafusion.execution.batch_size 8192 datafusion.execution.coalesce_batches true datafusion.execution.collect_statistics true datafusion.execution.enable_ansi_mode false +datafusion.execution.enable_file_stream_work_stealing true datafusion.execution.enable_recursive_ctes true datafusion.execution.enforce_batch_size_in_joins false datafusion.execution.hash_join_buffering_capacity 0 @@ -372,6 +373,7 @@ datafusion.execution.batch_size 8192 Default batch size while creating new batch datafusion.execution.coalesce_batches true When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting datafusion.execution.collect_statistics true Should DataFusion collect statistics when first creating a table. Has no effect after the table is created. Applies to the default `ListingTableProvider` in DataFusion. Defaults to true. datafusion.execution.enable_ansi_mode false Whether to enable ANSI SQL mode. The flag is experimental and relevant only for DataFusion Spark built-in functions When `enable_ansi_mode` is set to `true`, the query engine follows ANSI SQL semantics for expressions, casting, and error handling. This means: - **Strict type coercion rules:** implicit casts between incompatible types are disallowed. - **Standard SQL arithmetic behavior:** operations such as division by zero, numeric overflow, or invalid casts raise runtime errors rather than returning `NULL` or adjusted values. - **Consistent ANSI behavior** for string concatenation, comparisons, and `NULL` handling. When `enable_ansi_mode` is `false` (the default), the engine uses a more permissive, non-ANSI mode designed for user convenience and backward compatibility. In this mode: - Implicit casts between types are allowed (e.g., string to integer when possible). - Arithmetic operations are more lenient — for example, `abs()` on the minimum representable integer value returns the input value instead of raising overflow. - Division by zero or invalid casts may return `NULL` instead of failing. # Default `false` — ANSI SQL mode is disabled by default. +datafusion.execution.enable_file_stream_work_stealing true When `true` (the default), DataFusion's built-in file scans dynamically rebalance files across partitions at query execution time: a partition that goes idle reads files (or byte-range morsels) originally assigned to a sibling partition, which keeps all partitions busy in a single process. Executors that depend on the plan-time partition assignment — such as Ballista and datafusion-distributed, which run each partition as an isolated task and never poll the siblings — should set this to `false` so each partition reads only its own file group and no runtime reassignment occurs. datafusion.execution.enable_recursive_ctes true Should DataFusion support recursive CTEs datafusion.execution.enforce_batch_size_in_joins false Should DataFusion enforce batch size in joins or not. By default, DataFusion will not enforce batch size in joins. Enforcing batch size in joins can reduce memory usage when joining large tables with a highly-selective join filter, but is also slightly slower. datafusion.execution.hash_join_buffering_capacity 0 How many bytes to buffer in the probe side of hash joins while the build side is concurrently being built. Without this, hash joins will wait until the full materialization of the build side before polling the probe side. This is useful in scenarios where the query is not completely CPU bounded, allowing to do some early work concurrently and reducing the latency of the query. Note that when hash join buffering is enabled, the probe side will start eagerly polling data, not giving time for the producer side of dynamic filters to produce any meaningful predicate. Queries with dynamic filters might see performance degradation. Disabled by default, set to a number greater than 0 for enabling it. diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 6decc17d14ea9..e8efee5e00368 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -133,6 +133,7 @@ The following configuration settings are available: | datafusion.execution.enable_recursive_ctes | true | Should DataFusion support recursive CTEs | | datafusion.execution.split_file_groups_by_statistics | false | Attempt to eliminate sorts by packing & sorting files with non-overlapping statistics into the same file groups. Currently experimental | | datafusion.execution.keep_partition_by_columns | false | Should DataFusion keep the columns used for partition_by in the output RecordBatches | +| datafusion.execution.enable_file_stream_work_stealing | true | When `true` (the default), DataFusion's built-in file scans dynamically rebalance files across partitions at query execution time: a partition that goes idle reads files (or byte-range morsels) originally assigned to a sibling partition, which keeps all partitions busy in a single process. Executors that depend on the plan-time partition assignment — such as Ballista and datafusion-distributed, which run each partition as an isolated task and never poll the siblings — should set this to `false` so each partition reads only its own file group and no runtime reassignment occurs. | | datafusion.execution.skip_partial_aggregation_probe_ratio_threshold | 0.8 | Aggregation ratio (number of distinct groups / number of input rows) threshold for skipping partial aggregation. If the value is greater then partial aggregation will skip aggregation for further input | | datafusion.execution.skip_partial_aggregation_probe_rows_threshold | 100000 | Number of input rows partial aggregation partition should process, before aggregation ratio check and trying to switch to skipping aggregation mode | | datafusion.execution.use_row_number_estimates_to_optimize_partitioning | false | Should DataFusion use row number estimates at the input to decide whether increasing parallelism is beneficial or not. By default, only exact row numbers (not estimates) are used for this decision. Setting this flag to `true` will likely produce better plans. if the source of statistics is accurate. We plan to make this the default in the future. |