Skip to content

Databricks workflow automatic repair airflow3#68358

Closed
Beat-Nick wants to merge 16 commits into
apache:mainfrom
Beat-Nick:databricks-workflow-repair-airflow3
Closed

Databricks workflow automatic repair airflow3#68358
Beat-Nick wants to merge 16 commits into
apache:mainfrom
Beat-Nick:databricks-workflow-repair-airflow3

Conversation

@Beat-Nick

@Beat-Nick Beat-Nick commented Jun 10, 2026

Copy link
Copy Markdown

Enable Databricks Workflow Repair In Airflow 3

Adds automatic Databricks Workflow repair support to DatabricksWorkflowTaskGroup on Airflow 3.

Today, a transient Databricks task failure fails the Dag. Retrying the Airflow task does not recover the Databricks Workflow run: the retried task calls the Databricks API, sees that the existing run is still failed, and fails again.

When workflow_repair_attempts > 0, DatabricksWorkflowTaskGroup injects a repair_coordinator task for the workflow run. After a workflow failure, the coordinator calls Databricks repair_run so Databricks reruns the failed tasks and their dependent tasks in the existing workflow run.

Downstream DatabricksNotebookOperator tasks find the replacement sub-run for their task_key and continue watching that run.

task_group = DatabricksWorkflowTaskGroup(
    group_id="Example Workflow",
    databricks_conn_id=DATABRICKS_CONN_ID,
    workflow_repair_attempts=2,
    workflow_repair_polling_period=15,
    workflow_repair_timeout=180,
)
image

Design

launch remains responsible only for starting the workflow run. It creates or resets the Databricks job, starts the run, stores the run link, and returns {conn_id, job_id, run_id} through XCom.

repair_coordinator is injected after launch when workflow_repair_attempts > 0. It owns the repair for the parent Databricks run. It watches the original run_id until the run reaches a terminal state:

  • if the run succeeds, the coordinator succeeds;
  • if the run fails and repair budget remains, the coordinator calls repair_run;
  • if the run fails after the repair budget is exhausted, the coordinator fails.

The coordinator is the only task that calls repair_run, so multiple failed Databricks sub-runs cannot race each other to repair the same parent workflow run. The first repair uses the original run_id; later repairs also pass latest_repair_id so Databricks continues the same repair chain.

DatabricksNotebookOperator monitors its Databricks sub-run. On failure, if workflow repair is enabled, it waits in the same Airflow task attempt for Databricks to expose a newer sub-run for the same task_key on the parent workflow run, then monitors that attempt. If no new attempt appears within the grace window, the task fails.

The coordinator and DatabricksNotebookOperator both support sync and deferrable execution. The behavior is the same in both modes:

  • deferrable mode waits with triggers between Databricks polling cycles;
  • sync mode waits with inline polling loops.

Both modes share the same repair payload builder and task-attempt selection helper, so Databricks API requests and replacement-task matching stay consistent across sync and deferrable execution.

When workflow_repair_attempts == 0, the repair coordinator is not injected and existing Databricks operator behavior is unchanged.

Synchronizing repair operations

To avoid treating stale Databricks state as final, and to keep the coordinator and the waiting tasks from disagreeing about how long Databricks is allowed to take, both sides share a single wall-clock deadline:

  • the deadline is workflow_repair_timeout seconds (default 180s / 3 minutes) measured from the parent run's terminal end_time, the value both sides read from the same Databricks run object;
  • after repair_run is accepted, the coordinator waits until that repair is reflected in the run before it does anything else, polling every workflow_repair_polling_period seconds and giving up at the shared deadline;
  • a waiting task monitor uses the same deadline before declaring the parent permanently failed, and restarts the window if the run leaves its terminal state (a repair is in flight).

Because both sides anchor to the same end_time and the same timeout, they reach the same give-up instant regardless of when each one started polling — a downstream task is never failed while the coordinator could still produce a repair.

Notes

  • Downstream Airflow tasks stay in the same Airflow attempt while waiting for Databricks repair and monitoring the repaired sub-run.
  • Airflow 3 manual repair links remain out of scope for this PR. A future change could add a provider FastAPI endpoint and links, but that needs explicit race handling with the automatic coordinator.
Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

Generated-by: Claude Code + internally reviewed by my team.


Important

🛠️ Maintainer triage note for @Beat-Nick · by @potiuk · 2026-06-22 06:31 UTC

Helpful heads-up from the maintainers — please address before this PR can be reviewed (see the Pull Request quality criteria):

  • Pre-commit / static checks — failing: CI image checks / Static checks.

The ball is in your court — you've been assigned to this PR. Fix the above, then mark it Ready for review.

Automated triage — may be imperfect; a maintainer takes the next look.

@eladkal

eladkal commented Jun 11, 2026

Copy link
Copy Markdown
Contributor

I assume this designed to solve #52280 ?
cc @moomindani @mwojtyczka for review

@eladkal eladkal requested a review from pankajkoti June 11, 2026 08:09
@Beat-Nick

Copy link
Copy Markdown
Author

I assume this designed to solve #52280 ? cc @moomindani @mwojtyczka for review

Kind of. This issue is to restore UI buttons to manually repair a failure. This PR takes it a step further, allowing the Dag to recover by itself if configured correctly.

As far as the failing checks, it seems to be a temporary outage, but only an Admin can re-run them.

requests.exceptions.HTTPError: 503 Server Error: Backend is unhealthy
for url: https://pypi.org/pypi?:action=list_classifiers

@moomindani moomindani left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks — the single-writer repair model and the sync/deferrable parity are a clean design, and test coverage is thorough. Thanks too for clarifying the link to #52280 and flagging the transient PyPI 503. A couple of things before this lands:

1. Grace window asymmetry between the waiter and the coordinator (main question). I left an inline comment on WORKFLOW_REPAIR_GRACE_POLLS in triggers/databricks.py: the waiter's 3-poll (90s) tolerance is much tighter than the coordinator's workflow_repair_timeout (300s), so on a slow workspace the downstream task can be failed before the repair is reflected. Worth aligning both sides on a single wall-clock bound.

2. Real-environment validation. This is exactly the kind of flow — repair_run semantics, sub-run replacement, terminal-state eventual consistency — where mocked unit tests can't fully establish correctness. An airflow dags test run against a real Databricks workspace showing a transient sub-run failure → automatic repair → downstream task continuing in the same attempt would close the loop convincingly. Happy to help validate on a workspace if useful.

3. Minor: find_new_workflow_task_attempt selects the newest candidate by start_time and excludes the original run_id; worth a comment on the assumption that Databricks always populates start_time on repaired sub-runs, since the original_start_time is None fallback would otherwise match any non-original attempt.

Comment thread providers/databricks/src/airflow/providers/databricks/triggers/databricks.py Outdated
@Beat-Nick

Copy link
Copy Markdown
Author

Thanks for the review, good points all around. Here are the changes I've made:

  1. Grace-window asymmetry. Both sides now share one wall-clock. I dropped the WORKFLOW_REPAIR_GRACE_POLLS counter. The coordinator and wait trigger both derive their deadline from compute_repair_deadline(), which is the parent run's terminal end_time plus a shared workflow_repair_timeout, so they expire together and the waiter can't fail the downstream task before the repair window elapses. I also dropped the default from 300s to 180s, since the repair API usually responds in milliseconds. Full details in a new subsection of the PR description.

  2. Real-environment validation. I've tested airflow Dags against my Databricks workspace: sync, deferrable, max-repair threshold, and a job with mixed operator types. All behaved as expected. I'm happy to capture a specific run if useful.

  3. start_time assumption. Rather than just commenting on the assumption, I made the code enforce it: the function now requires a populated start_time, so a not-yet-started attempt yields None and we keep polling instead of latching onto it. This matters most in the original_start_time is None case — without the requirement, that branch would match any non-original attempt, including one Databricks has accepted but not yet started. start_time is set when the run is accepted (even while the cluster is still booting).

@moomindani moomindani left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the thorough revision. The earlier points are all addressed, and I validated the core repair flow end-to-end against a live Databricks workspace (repair_run, get_run(include_history=True), is_repair_reflected, find_new_workflow_task_attempt, and the full fail → repair → succeed cycle). The single-writer coordinator, the shared end_time-anchored deadline, and the workflow_repair_attempts == 0 no-op path all look sound. Repairing a Workflow run from Airflow is a real gap, so thanks for tackling it.

I'd like to open a design discussion rather than request changes — mostly about how this positions itself relative to the other recovery mechanisms in play.

Repair vs. native task retries. Lakeflow Jobs has two distinct recovery mechanisms: native task retries (max_retries / min_retry_interval_millis), which re-run a task in-flight for transient failures, and repair_run, which acts on an already-terminal run and re-runs only the failed/dependent tasks (repair docs, task config). They're complementary control planes. Today DatabricksWorkflowTaskGroup exposes neither — the generated task spec sets no retry fields — and this PR adds the repair path. Could you say a bit about the intended division of responsibility? Specifically: is repair meant to be the primary recovery mechanism here, or a complement to native task retries that might be exposed separately later? Framing this in the PR/discussion would help users reach for the right tool.

Overlap with Airflow's own retries. Airflow operators already have task-level retries, and combining those with a Databricks-side recovery loop can get confusing (double-retry, timing). I see the coordinator pins retries=0 and there's a warning about task-level retries — good. Could you confirm the full intended interaction when a user does set Airflow retries on the notebook operators alongside workflow_repair_attempts? I want to make sure the two retry models compose predictably.

Transient vs. non-transient failures. Because repair re-runs the failed tasks with the same code/config, automatically repairing a non-transient failure will tend to re-fail and burn through workflow_repair_attempts for the same reason — repair's design intent is post-root-cause-fix recovery. In testing I hit exactly this: a repair with no underlying state change re-failed, and only succeeded once the underlying condition changed. That's expected, but it means auto-repair is really targeting transient/infra failures (e.g. a run that went terminal on a cluster-level issue, where in-flight task retries couldn't help). Does that match your intended use case? If so, it's worth being explicit about it so users don't treat workflow_repair_attempts as a general-purpose retry knob.

None of this blocks the feature — it's about making sure the recovery model is coherent and clearly positioned. I've left a couple of smaller, concrete notes inline.


Drafted-by: Claude Code (Opus 4.8); reviewed by @moomindani before posting


After ``repair_run`` is accepted, Databricks needs a moment to drop the parent run out of its
terminal state. The coordinator polls every ``workflow_repair_polling_period`` seconds and gives
Databricks up to ``workflow_repair_timeout`` seconds (default 300s / 5 minutes) to

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default here is described as 300s / 5 minutes, but the code default for workflow_repair_timeout is 180 (3 minutes) — see _DatabricksWorkflowRepairCoordinatorOperator.__init__ (workflow_repair_timeout: int = 180). One of them should be corrected so the docs match the code.


Drafted-by: Claude Code (Opus 4.8); reviewed by @moomindani before posting

deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
**kwargs,
):
if workflow_repair_attempts < 1:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The coordinator and the waiter (DatabricksWorkflowRepairWaitTrigger) coordinate only via the shared end_time + workflow_repair_timeout deadline. If the coordinator detects the failure ~one workflow_repair_polling_period late and the repair isn't reflected before the waiter's deadline, the waiter can fail its task while Databricks actually goes on to repair (and possibly succeed) the run — leaving the Airflow task FAILED but the Databricks run repaired. With the defaults (180 vs 30) there's plenty of margin, so this only bites when workflow_repair_timeout is small relative to workflow_repair_polling_period.

A hard guard feels too strong here, since the safe margin really depends on Databricks' (non-deterministic) repair-reflection latency. But it might be worth emitting a log.warning(...) at construction time when workflow_repair_timeout is small relative to workflow_repair_polling_period (e.g. < polling_period * 2), so users get a heads-up at config time without being blocked. At minimum, a comment making this implicit relationship explicit would help.


Drafted-by: Claude Code (Opus 4.8); reviewed by @moomindani before posting

@Beat-Nick

Beat-Nick commented Jun 18, 2026

Copy link
Copy Markdown
Author

I'd like to open a design discussion rather than request changes — mostly about how this positions itself relative to the other recovery mechanisms in play.

Thanks, digging into these shifted how I think this should be positioned.

Context on how I got here: the failures driving this were all transient (a ~5 min upstream-source outage, a library that failed to install, a Python kernel going unresponsive), and a job repair cleared each one. So that's what I reached for and what this PR builds. But with fresh eyes, perhaps native task retries may be the better tool here, and the task group exposes neither today.

Repair vs. native retries. Complementary, split by cluster lifecycle. Native retries (max_retries / min_retry_interval_millis) re-run the failed task in-flight on the same cluster; repair_run acts on a terminal run, so it gets a fresh cluster and can re-run failed and dependent tasks. For the three failures above, retries are the better primary tool. So retries as first line, workflow_repair_attempts as the run-level backstop for what retries can't reach (fresh-cluster recovery on a degraded driver/node).

Interaction with Airflow retries. If retries are set on the task level, they will work how they today, which is admittedly confusing. On a failure Airflow retries only re-run the monitor, where it finds nothing to repair and fails again.

Path forward. I'd like to split native retries into its own PR first and keep this one as a potential follow-up for the cases retries can't cover. Sound right, or would you rather both land together here with the positioning documented up front?


Drafted-by: Claude Code (Opus 4.8); reviewed by @Beat-Nick before posting

@moomindani

Copy link
Copy Markdown
Contributor

This is a great write-up — the cluster-lifecycle split (native retries = in-flight, same cluster; repair = terminal run, fresh cluster + failed/dependent tasks) is exactly the right mental model, and it matches the failures you described.

I'd go with splitting: land native task retries (max_retries / min_retry_interval_millis on the task-group task spec) as its own PR first, and keep this repair PR as the scoped follow-up for what retries can't reach (fresh-cluster recovery on a degraded driver/node).

Reasoning:

  • Native retries is the smaller, lower-risk change and covers the majority of the real cases you hit (transient source outage, flaky install, unresponsive kernel), so it delivers most of the value on its own and is easy to review.
  • This repair PR is substantial (coordinator injection, sync/deferrable parity, the shared-deadline coordination). Landing it after retries exist lets it be scoped precisely to the fresh-cluster case and reviewed on its own merits, instead of carrying the "why not just retries?" question.

Two things worth folding into the native-retries PR while you're there:

  • Document the Airflow retries interaction. As you noted, Airflow task-level retries in the task group just re-run the monitor (no-op against an already-terminal sub-run), so users should reach for the Databricks-side max_retries instead. Calling that out will save people the exact confusion you described.
  • A line on the retries-vs-repair boundary — which failure classes retries cover vs. which actually need a fresh cluster — so the follow-up's scope is clear up front.

That's my read as a reviewer; @eladkal may have a preference on sequencing too.


Drafted-by: Claude Code (Opus 4.8); reviewed by @moomindani before posting

@potiuk

potiuk commented Jun 25, 2026

Copy link
Copy Markdown
Member

@Beat-Nick — the Static checks CI job is failing here, which needs a code fix on your side (a rerun won't clear it). You can reproduce and fix locally with:

pre-commit run --all-files     # or: breeze static-checks

Once those pass and CI is green, it'll be ready for a maintainer to pick up. Thanks!

See the PR quality criteria.

Automated first-pass triage note drafted by an AI-assisted tool — may get things wrong; once addressed, a real Apache Airflow maintainer takes the next look. (why automated)


Drafted-by: Claude Code (Opus 4.8); reviewed by @potiuk before posting

@Beat-Nick

Copy link
Copy Markdown
Author

I think the approach in #69182 is a better fit for my use case. It's a smaller, more targeted change while also incorporating several of the ideas discussed here. With that in mind, I'll be closing this PR for now.

@Beat-Nick Beat-Nick closed this Jun 30, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants