Skip to content

Add an example of an in-process model of distributed execution#23282

Draft
andygrove wants to merge 4 commits into
apache:mainfrom
andygrove:feat/datafusion-scheduler
Draft

Add an example of an in-process model of distributed execution#23282
andygrove wants to merge 4 commits into
apache:mainfrom
andygrove:feat/datafusion-scheduler

Conversation

@andygrove

@andygrove andygrove commented Jul 1, 2026

Copy link
Copy Markdown
Member

🧑 I work on Ballista, and I've been looking at datafusion-distributed lately too, and the same thing keeps biting distributed engines built on DataFusion: DataFusion's physical plans are written and tested assuming everything runs in one process, with all partitions of an operator polled together by cooperative threads on a shared runtime. Real distributed engines don't run that way — they cut the plan into stages, serialize each stage, and run each task in isolation with its own plan instance and its own task context. When a change quietly assumes the in-process model it still passes DataFusion's own tests, but it breaks Ballista and datafusion-distributed downstream, and we usually don't find out until much later.

This PR adds a small, opt-in crate that models that execution style in-process: it breaks a query into stages, serializes each stage, and executes tasks in isolation over an in-memory exchange — without actually building a distributed system (no network, no gRPC, no separate processes). I'm hoping it's useful for two things:

  1. As a reference implementation / example of how distributed execution works on top of DataFusion. It's a much smaller thing to read than a full engine, and it shows the parts that matter: breaking a query into stages, serializing plans, and executing tasks in isolation rather than with cooperative threads.
  2. As a regression guard. Once we run a subset of these tests in CI, they'll catch the class of change that breaks isolated execution before it ships to downstream projects. It already caught one on its first run (a scan that reads the whole table when its partitions are executed in isolation — details in the AI section below).

It's deliberately not trying to be another distributed engine — datafusion-distributed already fills that role well. This is meant to be the lightweight, dependency-free thing that can live in-tree and make life easier for the projects built on top of DataFusion.

Opening as a draft to get early feedback on whether this is worth having in the repo and on the overall approach.


🤖 The section below was drafted by an AI assistant to describe the change in detail.

Which issue does this PR close?

No existing issue — this is a draft to gauge interest. Happy to file a tracking issue if there's appetite for it.

Rationale for this change

DataFusion operators are developed against a single in-process plan tree: one Arc<dyn ExecutionPlan> instance, state shared via Arc, and all partitions of an operator polled together on one runtime. Distributed engines built on DataFusion violate every one of those assumptions:

  • the plan is cut into stages at shuffle boundaries, and each stage is serialized (both Ballista and datafusion-distributed use datafusion-proto) and shipped elsewhere;
  • each task runs one partition of a stage, on its own decoded plan instance, with no shared execution state with sibling tasks;
  • data crosses stage boundaries as serialized batches over a transport, never passed by reference.

There is currently no test surface in DataFusion that exercises these assumptions, so a change that quietly depends on the in-process model compiles, passes CI, and only breaks once it reaches a downstream engine. This crate provides that surface, in-process and with no transport dependencies, so it can eventually run in DataFusion's own CI.

What changes are included in this PR?

A new top-level crate, datafusion-scheduler, added to the workspace exclude list so it is not a workspace member — neither cargo build nor cargo build --workspace compiles it, and it has zero impact on normal builds/CI. It builds only when explicitly targeted (cd datafusion-scheduler && cargo test). The only change to existing files is adding the crate to [workspace] exclude in the root Cargo.toml.

How it works:

  1. Stage splitting (create_stages) walks a physical-optimized ExecutionPlan and cuts a new stage at each RepartitionExec(Hash), CoalescePartitionsExec, and SortPreservingMergeExec, producing a StageGraph whose stage ids are allocated so producers precede consumers.
  2. Serialization + isolation (the core invariant): each stage plan is encoded once via datafusion-proto, and every task decodes its own fresh plan instance against its own SessionContext/TaskContext. No Arc execution state is shared between tasks — this is what reproduces the distributed model.
  3. Streaming in-memory exchange: ExchangeSinkExec hash-partitions its input and pushes Arrow-IPC-encoded frames into an InMemoryExchange of bounded channels; ExchangeSourceExec merges the producer channels and decodes them back. A custom ExchangeCodec (PhysicalExtensionCodec) serializes these two operators; the exchange itself is injected on decode, not serialized (the in-process analogue of a network endpoint).
  4. Executor: a spawn-all, no-barrier model — every task of every stage is spawned concurrently and wired together by the exchange's bounded channels (backpressure, not a barrier), mirroring datafusion-distributed's pipelined streaming, minus gRPC.
  5. Differential harness: run_distributed(ctx, plan, config) plus assert_distributed_eq, which asserts the distributed result equals single-node collect(plan).

It follows datafusion-distributed's streaming (no-disk, no-barrier) shape because that is the more general model, but the isolate-and-serialize contract it exercises is identical for Ballista, so a regression it catches applies to both.

A regression it already caught

The isolated-scan guard (regression_isolated_scan_reads_only_its_partition) reproduces a real behavior: a 4-file CSV scan (100 rows) returns 4× the correct aggregate (n=400, sum=64800 vs the true 100/16200) under distributed execution, because the scan reads the whole table in each isolated per-partition task. It reproduces with a bare DataFusion plan and a single execute(0, ..) call — no distributed code involved — and traces to the morsel-driven file scan (datafusion/datasource/src/morsel/) handing file work from a pool shared across a plan instance's partitions, drained correctly only when all partitions are polled together. This is exactly the class of in-process assumption that breaks any engine running plan partitions as isolated tasks. The test is #[ignore]d with its assertion intact rather than weakened, as a caught regression to triage.

Prior art / attribution

No code is copied from either project, but the design draws on prior art in the DataFusion ecosystem. The stage-splitting approach (create_stages) is modeled on Ballista's DefaultDistributedPlanner::plan_query_stages_internal (reimplemented and simplified). The plan-serialization codec follows the same PhysicalExtensionCodec pattern that both Ballista and datafusion-distributed use. The streaming, no-barrier in-memory exchange is inspired by datafusion-distributed's pull-based Arrow Flight model, reimplemented in-process without gRPC. Both projects are Apache-2.0.

Are these changes tested?

Yes — the crate is almost entirely tests. Unit tests cover stage splitting (boundaries, topological id order, multi-input wiring), the exchange operators (streaming round-trip, tight-backpressure/no-deadlock, codec round-trip with overflow guards and partitioning fidelity), and executor error propagation. Integration tests assert assert_distributed_eq for hash join, sort-merge join, sort, and multi-stage aggregate→sort queries, plus a tight-backpressure (channel_capacity = 1) end-to-end streaming test. cargo test in the crate is green (18 passing, 2 intentionally #[ignore]d), with cargo clippy --all-targets -- -D warnings and cargo fmt clean.

Are there any user-facing changes?

None. The crate is excluded from the workspace and nothing in DataFusion depends on it, so normal builds, published crates, and CI are unaffected. It is opt-in only.

@andygrove andygrove force-pushed the feat/datafusion-scheduler branch from e2b9e4f to 2a0c9bc Compare July 1, 2026 17:22
…ocess distributed execution

Add `datafusion-scheduler`, an opt-in (workspace-excluded) crate that runs a
physical `ExecutionPlan` the way a distributed engine would — split into stages
at shuffle boundaries, each stage serialized via datafusion-proto, each task
executed from a freshly deserialized plan with its own TaskContext, data crossing
stage boundaries as Arrow-IPC frames over a streaming in-memory exchange —
entirely in one process.

It models the execution contract common to Ballista and datafusion-distributed
(serialize per stage, instantiate a fresh plan per task, execute one partition in
isolation) without any network/gRPC, so it can serve as a small reference example
and, once wired into CI, a regression guard for physical-plan changes that assume
shared in-process state.

- Scheduler splits the plan at RepartitionExec(Hash) / CoalescePartitionsExec /
  SortPreservingMergeExec into a StageGraph (producers before consumers).
- ExchangeSinkExec / ExchangeSourceExec move data over an InMemoryExchange of
  bounded channels carrying Arrow-IPC frames; ExchangeCodec (de)serializes them.
- Spawn-all, no-barrier streaming executor; each task decodes its own plan with
  its own SessionContext / TaskContext.
- run_distributed + assert_distributed_eq differential harness vs single-node
  collect; unit + equivalence + streaming tests.

The crate is added to [workspace] exclude and only builds when explicitly
targeted, so it does not affect normal builds or CI.
@andygrove andygrove force-pushed the feat/datafusion-scheduler branch from 2a0c9bc to 8d858e2 Compare July 1, 2026 17:49
@avantgardnerio

Copy link
Copy Markdown
Contributor

As an owner of a Ballista fork, this would definitely make our lives easier. I've done 10 DF upgrades so far, and they all bring pain.

@alamb

alamb commented Jul 1, 2026

Copy link
Copy Markdown
Contributor

Rather than adding a new crate, what would you think about adding this as a self contained example to https://github.com/apache/datafusion/tree/main/datafusion-examples/examples?

I think that would make it clearer that we don't plan to ship this, and it would also increase the surface area of our in-repo tests?

@andygrove

Copy link
Copy Markdown
Member Author

Rather than adding a new crate, what would you think about adding this as a self contained example to https://github.com/apache/datafusion/tree/main/datafusion-examples/examples?

I think that would make it clearer that we don't plan to ship this, and it would also increase the surface area of our in-repo tests?

I would be fine with having it as an example, if we can still (eventually) have a CI test for it.

@andygrove

Copy link
Copy Markdown
Member Author

Rather than adding a new crate, what would you think about adding this as a self contained example to https://github.com/apache/datafusion/tree/main/datafusion-examples/examples?
I think that would make it clearer that we don't plan to ship this, and it would also increase the surface area of our in-repo tests?

I would be fine with having it as an example, if we can still (eventually) have a CI test for it.

The motivation was to catch regressions that break the distributed execution model.

@alamb

alamb commented Jul 1, 2026

Copy link
Copy Markdown
Contributor

I would be fine with having it as an example, if we can still (eventually) have a CI test for it.

The motivation was to catch regressions that break the distributed execution model.

We do run the examples as part of all CI runs

For example here is the run for this PR:
https://github.com/apache/datafusion/actions/runs/28536953772/job/84602159635?pr=23282

@andygrove

Copy link
Copy Markdown
Member Author

I would be fine with having it as an example, if we can still (eventually) have a CI test for it.

The motivation was to catch regressions that break the distributed execution model.

We do run the examples as part of all CI runs

For example here is the run for this PR: https://github.com/apache/datafusion/actions/runs/28536953772/job/84602159635?pr=23282

Running as an example would make it difficult to run the tests, and the motivation of the PR was to catch regressions through these tests.

It seems that there isn't much appetite for this approach, so I will close this PR.

I will instead maintain a branch in Ballista that I keep up-to-date with latest DataFusion main so that we can catch regressions before release. Thanks for the feedback.

@andygrove andygrove closed this Jul 2, 2026
@Dandandan

Copy link
Copy Markdown
Contributor

I really do like the idea of a (maybe a small, contained) example of distributed datafusion that runs in CI e.g. with some tests that runs some benchmarks and checks correctness.

@andygrove

Copy link
Copy Markdown
Member Author

I really do like the idea of a (maybe a small, contained) example of distributed datafusion that runs in CI e.g. with some tests that runs some benchmarks and checks correctness.

Maybe we just need some targeted unit tests to mock up some of the aspects of distributed execution?

@alamb

alamb commented Jul 2, 2026

Copy link
Copy Markdown
Contributor

Running as an example would make it difficult to run the tests, and the motivation of the PR was to catch regressions through these tests.

I don't understand this assertion. Why does an example make it hard to run tests?

@andygrove

Copy link
Copy Markdown
Member Author

I don't understand this assertion. Why does an example make it hard to run tests?

Maybe I misinterpreted your comment #23282 (comment). I thought you were suggesting making it an example so as to not increase the number of tests.

I will reopen the PR and convert to an example, with tests, so we can discuss more.

@alamb

alamb commented Jul 2, 2026

Copy link
Copy Markdown
Contributor

I don't understand this assertion. Why does an example make it hard to run tests?

Maybe I misinterpreted your comment #23282 (comment). I thought you were suggesting making it an example so as to not increase the number of tests.

I will reopen the PR and convert to an example, with tests, so we can discuss more.

What I would like to avoid is yet another crate if we can avoid it (as i think more a new crate would make it harder to understand what code was meant to be reused and what was meant to be examples

…example

Address PR apache#23282 review feedback: fold the datafusion-scheduler crate into
datafusion-examples/examples/scheduler/ as a runnable example, with all
integration tests preserved as `#[cfg(test)]` modules so `cargo test --example
scheduler` still exercises the isolated / serialized execution path.

- New example at `datafusion-examples/examples/scheduler/` mirrors the
  original crate layout (config, exchange/{sink,source,codec}, executor,
  scheduler, serde, stage, test_util) plus a `main.rs` runner exposing a
  `distributed_pipeline` subcommand.
- All tests folded inline: stage-splitting tests in `scheduler.rs`, exchange
  round-trip / backpressure / codec tests in `exchange/{mod,codec}.rs`, and
  the equivalence / error / single-stage / two-stage / streaming suites as
  separate test modules in `main.rs`. Same 18 passing + 2 ignored tests as
  before, plus a smoke test for the runnable example.
- Remove the `datafusion-scheduler/` crate and its workspace `exclude`.
- Add a "Scheduler Examples" entry to `datafusion-examples/README.md`.
@Dandandan

Dandandan commented Jul 2, 2026

Copy link
Copy Markdown
Contributor

We had some discussion about it (at Coralogix), two inputs:

  • I think it would be nice if the example is as simple as possible: an educational example that shows how to split into stages and run it to get results.

  • To get coverage while not having to write/maintain a lot of new tests we could consider to run it against sqllogictest tests (starting with a subset probably to make it feasible): essentially running sqllogictest tests under the distributed execution mechanism. This would help to get coverage for more rare aggregation / window functions, join types, execution plan properties, etc. (that a smaller test like only running e.g. TPC-H would miss).

@andygrove andygrove reopened this Jul 2, 2026
@andygrove

Copy link
Copy Markdown
Member Author
  • I think it would be nice if the example is as simple as possible: an educational example that shows how to split into stages and run it to get results.

Yes, this current PR is too much code to maintain. A simple example would be better.

  • To get coverage while not having to write/maintain a lot of new tests we could consider to run it against sqllogictest tests (starting with a subset probably to make it feasible): essentially running sqllogictest tests under the distributed execution mechanism. This would help to get coverage for more rare aggregation / window functions, join types, execution plan properties, etc. (that a smaller test like only running e.g. TPC-H would miss).

Many of the sqllogictest tests only query a few rows, sometimes just evaluate literals. These may not be the best fit for distributed execution.

@andygrove andygrove changed the title Add opt-in datafusion-scheduler crate: an in-process model of distributed execution Add an example of an in-process model of distributed execution Jul 2, 2026
@andygrove

Copy link
Copy Markdown
Member Author

This PR now adds an example rather than a new crate

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants