Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,19 @@ config_namespace! {
/// Should DataFusion keep the columns used for partition_by in the output RecordBatches
pub keep_partition_by_columns: bool, default = false

/// When `true` (the default), DataFusion's built-in file scans
/// dynamically rebalance files across partitions at query execution
/// time: a partition that goes idle reads files (or byte-range morsels)
/// originally assigned to a sibling partition, which keeps all
/// partitions busy in a single process.
///
/// Executors that depend on the plan-time partition assignment — such as
/// Ballista and datafusion-distributed, which run each partition as an
/// isolated task and never poll the siblings — should set this to
/// `false` so each partition reads only its own file group and no
/// runtime reassignment occurs.
pub enable_file_stream_work_stealing: bool, default = true

/// Aggregation ratio (number of distinct groups / number of input rows)
/// threshold for skipping partial aggregation. If the value is greater
/// then partial aggregation will skip aggregation for further input
Expand Down
22 changes: 18 additions & 4 deletions datafusion/datasource/src/file_scan_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,12 @@ pub struct FileScanConfig {
/// DataFusion may attempt to read each partition of files
/// concurrently, however files *within* a partition will be read
/// sequentially, one after the next.
///
/// Note that when `datafusion.execution.enable_file_stream_work_stealing`
/// is enabled (the default), files may be reassigned to a different
/// partition at runtime unless `preserve_order` or
/// `partitioned_by_file_group` is set, so a file is not guaranteed to be
/// read by the partition it is grouped under here.
pub file_groups: Vec<FileGroup>,
/// Table constraints
pub constraints: Constraints,
Expand Down Expand Up @@ -1038,10 +1044,18 @@ impl DataSource for FileScanConfig {
/// during one execution.
///
/// This returns `None` when sibling streams must not share work, such as
/// when file order must be preserved or the file groups define the output
/// partitioning needed for the rest of the plan
fn create_sibling_state(&self) -> Option<Arc<dyn Any + Send + Sync>> {
if self.preserve_order || self.partitioned_by_file_group {
/// when file order must be preserved, the file groups define the output
/// partitioning needed for the rest of the plan, or work stealing is
/// disabled via
/// `datafusion.execution.enable_file_stream_work_stealing`.
fn create_sibling_state(
&self,
config: &ConfigOptions,
) -> Option<Arc<dyn Any + Send + Sync>> {
if self.preserve_order
|| self.partitioned_by_file_group
|| !config.execution.enable_file_stream_work_stealing
{
return None;
}

Expand Down
57 changes: 53 additions & 4 deletions datafusion/datasource/src/file_stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ mod tests {
use arrow::array::{AsArray, RecordBatch};
use arrow::datatypes::{DataType, Field, Int32Type, Schema};
use datafusion_common::DataFusionError;
use datafusion_common::config::ConfigOptions;
use datafusion_common::error::Result;
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
Expand Down Expand Up @@ -1131,6 +1132,40 @@ mod tests {
Ok(())
}

/// Verifies that disabling `enable_file_stream_work_stealing` keeps each
/// stream's files local, so a sibling cannot steal them at runtime.
///
/// Covers <https://github.com/apache/datafusion/issues/23293>: executors
/// that run each output partition as an isolated task in a separate process
/// (Ballista, datafusion-distributed) poll only their own partition, so the
/// shared work queue would let that one partition drain files belonging to
/// its siblings. Disabling the flag falls back to per-partition file groups.
#[tokio::test]
async fn morsel_disabled_work_stealing_keeps_files_local() -> Result<()> {
// same fixture as `morsel_shared_files_can_be_stolen`, but with work
// stealing disabled via config
let test = two_partition_morsel_test()
.with_enable_file_stream_work_stealing(false)
.with_file_stream_events(false);

// Even though Partition 1 is polled first, it cannot steal the three
// files assigned to Partition 0; each partition reads only its own.
insta::assert_snapshot!(test.run().await.unwrap(), @r"
----- Partition 0 -----
Batch: 101
Batch: 102
Batch: 103
Done
----- Partition 1 -----
Batch: 201
Done
----- File Stream Events -----
(omitted due to with_file_stream_events(false))
");

Ok(())
}

/// Verifies that an empty sibling can immediately steal shared files when
/// it is polled before the stream that originally owned them.
#[tokio::test]
Expand Down Expand Up @@ -1216,7 +1251,7 @@ mod tests {
let unlimited_config = test.test_config();
let limited_config = test.clone().with_limit(1).test_config();
let shared_work_source = limited_config
.create_sibling_state()
.create_sibling_state(&ConfigOptions::default())
.and_then(|state| state.as_ref().downcast_ref::<SharedWorkSource>().cloned())
.expect("shared work source");
let limited_metrics = ExecutionPlanMetricsSet::new();
Expand Down Expand Up @@ -1332,6 +1367,7 @@ mod tests {
partition_files: BTreeMap<PartitionId, Vec<String>>,
preserve_order: bool,
partitioned_by_file_group: bool,
enable_file_stream_work_stealing: bool,
file_stream_events: bool,
build_streams_on_first_read: bool,
reads: Vec<PartitionId>,
Expand All @@ -1346,6 +1382,7 @@ mod tests {
partition_files: BTreeMap::new(),
preserve_order: false,
partitioned_by_file_group: false,
enable_file_stream_work_stealing: true,
file_stream_events: true,
build_streams_on_first_read: false,
reads: vec![],
Expand Down Expand Up @@ -1391,6 +1428,14 @@ mod tests {
self
}

/// Sets `datafusion.execution.enable_file_stream_work_stealing`. When
/// disabled, each stream keeps its own files local instead of sharing a
/// work queue with its siblings.
fn with_enable_file_stream_work_stealing(mut self, enable: bool) -> Self {
self.enable_file_stream_work_stealing = enable;
self
}

/// Controls whether scheduler events are included in the snapshot.
///
/// When disabled, `run()` still includes the event section header but
Expand Down Expand Up @@ -1468,9 +1513,13 @@ mod tests {
// `FileStream`s directly, bypassing `DataSourceExec`, so they must
// perform the same setup explicitly when exercising sibling-stream
// work stealing.
let shared_work_source = config.create_sibling_state().and_then(|state| {
state.as_ref().downcast_ref::<SharedWorkSource>().cloned()
});
let mut options = ConfigOptions::default();
options.execution.enable_file_stream_work_stealing =
self.enable_file_stream_work_stealing;
let shared_work_source =
config.create_sibling_state(&options).and_then(|state| {
state.as_ref().downcast_ref::<SharedWorkSource>().cloned()
});
if !self.build_streams_on_first_read {
for partition in build_order {
let stream = FileStreamBuilder::new(&config)
Expand Down
14 changes: 12 additions & 2 deletions datafusion/datasource/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,9 +243,16 @@ pub trait DataSource: Any + Send + Sync + Debug {
/// Create per execution state to share across sibling instances of this
/// data source during one execution.
///
/// `config` is the session configuration, so implementations can honor
/// options that disable sibling sharing (returning `None`) for consumers
/// that cannot poll all partitions in one process.
///
/// Returns `None` (the default) if this data source has
/// no sibling-shared execution state.
fn create_sibling_state(&self) -> Option<Arc<dyn Any + Send + Sync>> {
fn create_sibling_state(
&self,
_config: &ConfigOptions,
) -> Option<Arc<dyn Any + Send + Sync>> {
None
}

Expand Down Expand Up @@ -391,7 +398,10 @@ impl ExecutionPlan for DataSourceExec {
) -> Result<SendableRecordBatchStream> {
let shared_state = self
.execution_state
.get_or_init(|| self.data_source.create_sibling_state())
.get_or_init(|| {
self.data_source
.create_sibling_state(context.session_config().options())
})
.clone();
let args = OpenArgs::new(partition, Arc::clone(&context))
.with_shared_state(shared_state);
Expand Down
2 changes: 2 additions & 0 deletions datafusion/sqllogictest/test_files/information_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ datafusion.execution.batch_size 8192
datafusion.execution.coalesce_batches true
datafusion.execution.collect_statistics true
datafusion.execution.enable_ansi_mode false
datafusion.execution.enable_file_stream_work_stealing true
datafusion.execution.enable_recursive_ctes true
datafusion.execution.enforce_batch_size_in_joins false
datafusion.execution.hash_join_buffering_capacity 0
Expand Down Expand Up @@ -372,6 +373,7 @@ datafusion.execution.batch_size 8192 Default batch size while creating new batch
datafusion.execution.coalesce_batches true When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting
datafusion.execution.collect_statistics true Should DataFusion collect statistics when first creating a table. Has no effect after the table is created. Applies to the default `ListingTableProvider` in DataFusion. Defaults to true.
datafusion.execution.enable_ansi_mode false Whether to enable ANSI SQL mode. The flag is experimental and relevant only for DataFusion Spark built-in functions When `enable_ansi_mode` is set to `true`, the query engine follows ANSI SQL semantics for expressions, casting, and error handling. This means: - **Strict type coercion rules:** implicit casts between incompatible types are disallowed. - **Standard SQL arithmetic behavior:** operations such as division by zero, numeric overflow, or invalid casts raise runtime errors rather than returning `NULL` or adjusted values. - **Consistent ANSI behavior** for string concatenation, comparisons, and `NULL` handling. When `enable_ansi_mode` is `false` (the default), the engine uses a more permissive, non-ANSI mode designed for user convenience and backward compatibility. In this mode: - Implicit casts between types are allowed (e.g., string to integer when possible). - Arithmetic operations are more lenient — for example, `abs()` on the minimum representable integer value returns the input value instead of raising overflow. - Division by zero or invalid casts may return `NULL` instead of failing. # Default `false` — ANSI SQL mode is disabled by default.
datafusion.execution.enable_file_stream_work_stealing true When `true` (the default), DataFusion's built-in file scans dynamically rebalance files across partitions at query execution time: a partition that goes idle reads files (or byte-range morsels) originally assigned to a sibling partition, which keeps all partitions busy in a single process. Executors that depend on the plan-time partition assignment — such as Ballista and datafusion-distributed, which run each partition as an isolated task and never poll the siblings — should set this to `false` so each partition reads only its own file group and no runtime reassignment occurs.
datafusion.execution.enable_recursive_ctes true Should DataFusion support recursive CTEs
datafusion.execution.enforce_batch_size_in_joins false Should DataFusion enforce batch size in joins or not. By default, DataFusion will not enforce batch size in joins. Enforcing batch size in joins can reduce memory usage when joining large tables with a highly-selective join filter, but is also slightly slower.
datafusion.execution.hash_join_buffering_capacity 0 How many bytes to buffer in the probe side of hash joins while the build side is concurrently being built. Without this, hash joins will wait until the full materialization of the build side before polling the probe side. This is useful in scenarios where the query is not completely CPU bounded, allowing to do some early work concurrently and reducing the latency of the query. Note that when hash join buffering is enabled, the probe side will start eagerly polling data, not giving time for the producer side of dynamic filters to produce any meaningful predicate. Queries with dynamic filters might see performance degradation. Disabled by default, set to a number greater than 0 for enabling it.
Expand Down
Loading
Loading