Skip to content

[branch-54] Add datafusion.execution.enable_file_stream_work_stealing config#23296

Merged
alamb merged 4 commits into
apache:branch-54from
andygrove:backport-23294-branch-54
Jul 3, 2026
Merged

[branch-54] Add datafusion.execution.enable_file_stream_work_stealing config#23296
alamb merged 4 commits into
apache:branch-54from
andygrove:backport-23294-branch-54

Conversation

@andygrove

Copy link
Copy Markdown
Member

Which issue does this PR close?

Rationale for this change

FileStream sibling work-stealing (WorkSource::Shared) seeds one shared work queue from every file group and lets whichever output partition goes idle first steal the next unopened file (or byte-range morsel). This assumes all output partitions of a scan are polled concurrently in one process.

Executors that run each output partition as an isolated task in a separate process — Ballista and datafusion-distributed — never poll the sibling partitions. The single polled partition drains the whole shared queue and reads files belonging to other partitions, so every isolated task reads the entire input and the scan output is inflated by the partition count. This is a correctness bug for those executors, not just a performance one.

The existing escape hatches (preserve_order, partitioned_by_file_group) are plan-level flags on FileScanConfig, not something a distributed executor can set centrally through the session config, and a plain repartitioned scan does not set partitioned_by_file_group. There is no session-level off switch, unlike datafusion.optimizer.enable_dynamic_filter_pushdown, which exists precisely so consumers that cannot support runtime cross-partition state can disable it.

What changes are included in this PR?

This is a backport of #23294. The changes are identical in intent; two conflicts were resolved for branch-54, which does not yet carry the output_partitioning field on FileScanConfig or the enable_migration_aggregate config that exist on main:

  • Add datafusion.execution.enable_file_stream_work_stealing (default true). When false, FileScanConfig::create_sibling_state returns None, so each partition falls back to WorkSource::Local and reads only its own file group.
  • Thread &ConfigOptions into DataSource::create_sibling_state so the flag is read from the session config at execute time. As a session config value it round-trips through datafusion-proto with no proto schema change.
  • Regenerate configs.md and add the setting to information_schema.slt.
  • Add the regression test morsel_disabled_work_stealing_keeps_files_local using the file's standard morsel snapshot harness.

Are these changes tested?

Yes. The file_stream test suite (including the new morsel_disabled_work_stealing_keeps_files_local) passes, and cargo clippy is clean. The existing sibling work-stealing tests continue to pass with the default.

Are there any user-facing changes?

A new session config, datafusion.execution.enable_file_stream_work_stealing (default true), so existing behavior is unchanged. DataSource::create_sibling_state gains a &ConfigOptions parameter (an API change for anyone implementing the DataSource trait directly).

andygrove added 4 commits July 2, 2026 11:00
Add an ignored regression test for apache#23293. FileStream sibling
work-stealing seeds one shared work queue from every file group and
relies on all output partitions being polled concurrently in one
process. An executor that runs each partition as an isolated task polls
only one partition, which then drains the whole queue and reads files
belonging to other partitions, inflating scan output by the partition
count.

The test builds and drives only partition 0 and asserts it reads solely
its own file. It fails on main by design, so it is #[ignore]d with its
assertion intact as a caught regression to triage.

(cherry picked from commit 1b2760b)
FileStream sibling work-stealing (WorkSource::Shared) seeds one shared work
queue from every file group and lets whichever output partition goes idle
first steal the next file or byte-range morsel. This assumes all output
partitions of a scan are polled concurrently in one process. Executors that
run each output partition as an isolated task in a separate process (Ballista,
datafusion-distributed) never poll the sibling partitions, so the single polled
partition drains the whole queue and reads files belonging to other partitions,
inflating the scan output by the partition count. That is a correctness bug for
those executors, and the existing escape hatches (preserve_order,
partitioned_by_file_group) are plan-level flags, not a session config they can
set centrally.

Add datafusion.execution.enable_file_stream_work_stealing (default true),
checked in FileScanConfig::create_sibling_state: when false it returns None so
each partition falls back to reading only its own file group. This mirrors the
enable_dynamic_filter_pushdown precedent and round-trips through datafusion-proto
as a config value. Thread ConfigOptions into DataSource::create_sibling_state so
the flag is read from the session config at execute time.

Turn the #[ignore]'d reproduction test into a passing regression test that
drives only partition 0 and checks both behaviors: with the default (stealing
on) partition 0 also reads partition 1's file, and with the flag off it reads
only its own.

Closes apache#23293.

(cherry picked from commit 482b04b)
…param

Drop the redundant bool from the test's drive_partition0 helper (assert the
shared-queue state directly via create_sibling_state) and document the config
parameter on DataSource::create_sibling_state.

(cherry picked from commit b836a6d)
- Reword the config docstring in user-visible terms (dynamic runtime
  rebalancing) rather than internal work-queue mechanics.
- Note on FileScanConfig::file_groups that files may be reassigned across
  partitions at runtime when work stealing is enabled unless preserve_order or
  partitioned_by_file_group is set.
- Replace the bespoke isolated-partition test with the file's standard morsel
  snapshot harness: FileStreamMorselTest gains with_enable_file_stream_work_stealing,
  and the test reuses two_partition_morsel_test to show that disabling the flag
  keeps each partition's files local.

Regenerate configs.md and information_schema.slt for the reworded docstring.

(cherry picked from commit 608915e)
@github-actions github-actions Bot added documentation Improvements or additions to documentation sqllogictest SQL Logic Tests (.slt) common Related to common crate datasource Changes to the datasource crate labels Jul 2, 2026
@alamb alamb changed the title backport #23294: add datafusion.execution.enable_file_stream_work_stealing config [branch-54]: add datafusion.execution.enable_file_stream_work_stealing config Jul 3, 2026
@alamb alamb changed the title [branch-54]: add datafusion.execution.enable_file_stream_work_stealing config [branch-54] Add datafusion.execution.enable_file_stream_work_stealing config Jul 3, 2026
@alamb alamb merged commit 209e49e into apache:branch-54 Jul 3, 2026
34 of 35 checks passed
@alamb

alamb commented Jul 3, 2026

Copy link
Copy Markdown
Contributor

Thanks @andygrove and @xudong963

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

common Related to common crate datasource Changes to the datasource crate documentation Improvements or additions to documentation sqllogictest SQL Logic Tests (.slt)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants