Add an example of an in-process model of distributed execution#23282
Add an example of an in-process model of distributed execution#23282andygrove wants to merge 4 commits into
Conversation
e2b9e4f to
2a0c9bc
Compare
…ocess distributed execution Add `datafusion-scheduler`, an opt-in (workspace-excluded) crate that runs a physical `ExecutionPlan` the way a distributed engine would — split into stages at shuffle boundaries, each stage serialized via datafusion-proto, each task executed from a freshly deserialized plan with its own TaskContext, data crossing stage boundaries as Arrow-IPC frames over a streaming in-memory exchange — entirely in one process. It models the execution contract common to Ballista and datafusion-distributed (serialize per stage, instantiate a fresh plan per task, execute one partition in isolation) without any network/gRPC, so it can serve as a small reference example and, once wired into CI, a regression guard for physical-plan changes that assume shared in-process state. - Scheduler splits the plan at RepartitionExec(Hash) / CoalescePartitionsExec / SortPreservingMergeExec into a StageGraph (producers before consumers). - ExchangeSinkExec / ExchangeSourceExec move data over an InMemoryExchange of bounded channels carrying Arrow-IPC frames; ExchangeCodec (de)serializes them. - Spawn-all, no-barrier streaming executor; each task decodes its own plan with its own SessionContext / TaskContext. - run_distributed + assert_distributed_eq differential harness vs single-node collect; unit + equivalence + streaming tests. The crate is added to [workspace] exclude and only builds when explicitly targeted, so it does not affect normal builds or CI.
2a0c9bc to
8d858e2
Compare
|
As an owner of a Ballista fork, this would definitely make our lives easier. I've done 10 DF upgrades so far, and they all bring pain. |
|
Rather than adding a new crate, what would you think about adding this as a self contained example to https://github.com/apache/datafusion/tree/main/datafusion-examples/examples? I think that would make it clearer that we don't plan to ship this, and it would also increase the surface area of our in-repo tests? |
I would be fine with having it as an example, if we can still (eventually) have a CI test for it. |
The motivation was to catch regressions that break the distributed execution model. |
We do run the examples as part of all CI runs For example here is the run for this PR: |
Running as an example would make it difficult to run the tests, and the motivation of the PR was to catch regressions through these tests. It seems that there isn't much appetite for this approach, so I will close this PR. I will instead maintain a branch in Ballista that I keep up-to-date with latest DataFusion main so that we can catch regressions before release. Thanks for the feedback. |
|
I really do like the idea of a (maybe a small, contained) example of distributed datafusion that runs in CI e.g. with some tests that runs some benchmarks and checks correctness. |
Maybe we just need some targeted unit tests to mock up some of the aspects of distributed execution? |
I don't understand this assertion. Why does an example make it hard to run tests? |
Maybe I misinterpreted your comment #23282 (comment). I thought you were suggesting making it an example so as to not increase the number of tests. I will reopen the PR and convert to an example, with tests, so we can discuss more. |
What I would like to avoid is yet another crate if we can avoid it (as i think more a new crate would make it harder to understand what code was meant to be reused and what was meant to be examples |
…example Address PR apache#23282 review feedback: fold the datafusion-scheduler crate into datafusion-examples/examples/scheduler/ as a runnable example, with all integration tests preserved as `#[cfg(test)]` modules so `cargo test --example scheduler` still exercises the isolated / serialized execution path. - New example at `datafusion-examples/examples/scheduler/` mirrors the original crate layout (config, exchange/{sink,source,codec}, executor, scheduler, serde, stage, test_util) plus a `main.rs` runner exposing a `distributed_pipeline` subcommand. - All tests folded inline: stage-splitting tests in `scheduler.rs`, exchange round-trip / backpressure / codec tests in `exchange/{mod,codec}.rs`, and the equivalence / error / single-stage / two-stage / streaming suites as separate test modules in `main.rs`. Same 18 passing + 2 ignored tests as before, plus a smoke test for the runnable example. - Remove the `datafusion-scheduler/` crate and its workspace `exclude`. - Add a "Scheduler Examples" entry to `datafusion-examples/README.md`.
|
We had some discussion about it (at Coralogix), two inputs:
|
Yes, this current PR is too much code to maintain. A simple example would be better.
Many of the |
datafusion-scheduler crate: an in-process model of distributed execution|
This PR now adds an example rather than a new crate |
🧑 I work on Ballista, and I've been looking at datafusion-distributed lately too, and the same thing keeps biting distributed engines built on DataFusion: DataFusion's physical plans are written and tested assuming everything runs in one process, with all partitions of an operator polled together by cooperative threads on a shared runtime. Real distributed engines don't run that way — they cut the plan into stages, serialize each stage, and run each task in isolation with its own plan instance and its own task context. When a change quietly assumes the in-process model it still passes DataFusion's own tests, but it breaks Ballista and datafusion-distributed downstream, and we usually don't find out until much later.
This PR adds a small, opt-in crate that models that execution style in-process: it breaks a query into stages, serializes each stage, and executes tasks in isolation over an in-memory exchange — without actually building a distributed system (no network, no gRPC, no separate processes). I'm hoping it's useful for two things:
It's deliberately not trying to be another distributed engine — datafusion-distributed already fills that role well. This is meant to be the lightweight, dependency-free thing that can live in-tree and make life easier for the projects built on top of DataFusion.
Opening as a draft to get early feedback on whether this is worth having in the repo and on the overall approach.
🤖 The section below was drafted by an AI assistant to describe the change in detail.
Which issue does this PR close?
No existing issue — this is a draft to gauge interest. Happy to file a tracking issue if there's appetite for it.
Rationale for this change
DataFusion operators are developed against a single in-process plan tree: one
Arc<dyn ExecutionPlan>instance, state shared viaArc, and all partitions of an operator polled together on one runtime. Distributed engines built on DataFusion violate every one of those assumptions:datafusion-proto) and shipped elsewhere;There is currently no test surface in DataFusion that exercises these assumptions, so a change that quietly depends on the in-process model compiles, passes CI, and only breaks once it reaches a downstream engine. This crate provides that surface, in-process and with no transport dependencies, so it can eventually run in DataFusion's own CI.
What changes are included in this PR?
A new top-level crate,
datafusion-scheduler, added to the workspaceexcludelist so it is not a workspace member — neithercargo buildnorcargo build --workspacecompiles it, and it has zero impact on normal builds/CI. It builds only when explicitly targeted (cd datafusion-scheduler && cargo test). The only change to existing files is adding the crate to[workspace] excludein the rootCargo.toml.How it works:
create_stages) walks a physical-optimizedExecutionPlanand cuts a new stage at eachRepartitionExec(Hash),CoalescePartitionsExec, andSortPreservingMergeExec, producing aStageGraphwhose stage ids are allocated so producers precede consumers.datafusion-proto, and every task decodes its own fresh plan instance against its ownSessionContext/TaskContext. NoArcexecution state is shared between tasks — this is what reproduces the distributed model.ExchangeSinkExechash-partitions its input and pushes Arrow-IPC-encoded frames into anInMemoryExchangeof bounded channels;ExchangeSourceExecmerges the producer channels and decodes them back. A customExchangeCodec(PhysicalExtensionCodec) serializes these two operators; the exchange itself is injected on decode, not serialized (the in-process analogue of a network endpoint).run_distributed(ctx, plan, config)plusassert_distributed_eq, which asserts the distributed result equals single-nodecollect(plan).It follows datafusion-distributed's streaming (no-disk, no-barrier) shape because that is the more general model, but the isolate-and-serialize contract it exercises is identical for Ballista, so a regression it catches applies to both.
A regression it already caught
The isolated-scan guard (
regression_isolated_scan_reads_only_its_partition) reproduces a real behavior: a 4-file CSV scan (100 rows) returns 4× the correct aggregate (n=400,sum=64800vs the true100/16200) under distributed execution, because the scan reads the whole table in each isolated per-partition task. It reproduces with a bare DataFusion plan and a singleexecute(0, ..)call — no distributed code involved — and traces to the morsel-driven file scan (datafusion/datasource/src/morsel/) handing file work from a pool shared across a plan instance's partitions, drained correctly only when all partitions are polled together. This is exactly the class of in-process assumption that breaks any engine running plan partitions as isolated tasks. The test is#[ignore]d with its assertion intact rather than weakened, as a caught regression to triage.Prior art / attribution
No code is copied from either project, but the design draws on prior art in the DataFusion ecosystem. The stage-splitting approach (
create_stages) is modeled on Ballista'sDefaultDistributedPlanner::plan_query_stages_internal(reimplemented and simplified). The plan-serialization codec follows the samePhysicalExtensionCodecpattern that both Ballista and datafusion-distributed use. The streaming, no-barrier in-memory exchange is inspired by datafusion-distributed's pull-based Arrow Flight model, reimplemented in-process without gRPC. Both projects are Apache-2.0.Are these changes tested?
Yes — the crate is almost entirely tests. Unit tests cover stage splitting (boundaries, topological id order, multi-input wiring), the exchange operators (streaming round-trip, tight-backpressure/no-deadlock, codec round-trip with overflow guards and partitioning fidelity), and executor error propagation. Integration tests assert
assert_distributed_eqfor hash join, sort-merge join, sort, and multi-stage aggregate→sort queries, plus a tight-backpressure (channel_capacity = 1) end-to-end streaming test.cargo testin the crate is green (18 passing, 2 intentionally#[ignore]d), withcargo clippy --all-targets -- -D warningsandcargo fmtclean.Are there any user-facing changes?
None. The crate is excluded from the workspace and nothing in DataFusion depends on it, so normal builds, published crates, and CI are unaffected. It is opt-in only.