Databricks workflow automatic repair airflow3#68358
Conversation
|
I assume this designed to solve #52280 ? |
Kind of. This issue is to restore UI buttons to manually repair a failure. This PR takes it a step further, allowing the Dag to recover by itself if configured correctly. As far as the failing checks, it seems to be a temporary outage, but only an Admin can re-run them. |
moomindani
left a comment
There was a problem hiding this comment.
Thanks — the single-writer repair model and the sync/deferrable parity are a clean design, and test coverage is thorough. Thanks too for clarifying the link to #52280 and flagging the transient PyPI 503. A couple of things before this lands:
1. Grace window asymmetry between the waiter and the coordinator (main question). I left an inline comment on WORKFLOW_REPAIR_GRACE_POLLS in triggers/databricks.py: the waiter's 3-poll (90s) tolerance is much tighter than the coordinator's workflow_repair_timeout (300s), so on a slow workspace the downstream task can be failed before the repair is reflected. Worth aligning both sides on a single wall-clock bound.
2. Real-environment validation. This is exactly the kind of flow — repair_run semantics, sub-run replacement, terminal-state eventual consistency — where mocked unit tests can't fully establish correctness. An airflow dags test run against a real Databricks workspace showing a transient sub-run failure → automatic repair → downstream task continuing in the same attempt would close the loop convincingly. Happy to help validate on a workspace if useful.
3. Minor: find_new_workflow_task_attempt selects the newest candidate by start_time and excludes the original run_id; worth a comment on the assumption that Databricks always populates start_time on repaired sub-runs, since the original_start_time is None fallback would otherwise match any non-original attempt.
|
Thanks for the review, good points all around. Here are the changes I've made:
|
moomindani
left a comment
There was a problem hiding this comment.
Thanks for the thorough revision. The earlier points are all addressed, and I validated the core repair flow end-to-end against a live Databricks workspace (repair_run, get_run(include_history=True), is_repair_reflected, find_new_workflow_task_attempt, and the full fail → repair → succeed cycle). The single-writer coordinator, the shared end_time-anchored deadline, and the workflow_repair_attempts == 0 no-op path all look sound. Repairing a Workflow run from Airflow is a real gap, so thanks for tackling it.
I'd like to open a design discussion rather than request changes — mostly about how this positions itself relative to the other recovery mechanisms in play.
Repair vs. native task retries. Lakeflow Jobs has two distinct recovery mechanisms: native task retries (max_retries / min_retry_interval_millis), which re-run a task in-flight for transient failures, and repair_run, which acts on an already-terminal run and re-runs only the failed/dependent tasks (repair docs, task config). They're complementary control planes. Today DatabricksWorkflowTaskGroup exposes neither — the generated task spec sets no retry fields — and this PR adds the repair path. Could you say a bit about the intended division of responsibility? Specifically: is repair meant to be the primary recovery mechanism here, or a complement to native task retries that might be exposed separately later? Framing this in the PR/discussion would help users reach for the right tool.
Overlap with Airflow's own retries. Airflow operators already have task-level retries, and combining those with a Databricks-side recovery loop can get confusing (double-retry, timing). I see the coordinator pins retries=0 and there's a warning about task-level retries — good. Could you confirm the full intended interaction when a user does set Airflow retries on the notebook operators alongside workflow_repair_attempts? I want to make sure the two retry models compose predictably.
Transient vs. non-transient failures. Because repair re-runs the failed tasks with the same code/config, automatically repairing a non-transient failure will tend to re-fail and burn through workflow_repair_attempts for the same reason — repair's design intent is post-root-cause-fix recovery. In testing I hit exactly this: a repair with no underlying state change re-failed, and only succeeded once the underlying condition changed. That's expected, but it means auto-repair is really targeting transient/infra failures (e.g. a run that went terminal on a cluster-level issue, where in-flight task retries couldn't help). Does that match your intended use case? If so, it's worth being explicit about it so users don't treat workflow_repair_attempts as a general-purpose retry knob.
None of this blocks the feature — it's about making sure the recovery model is coherent and clearly positioned. I've left a couple of smaller, concrete notes inline.
Drafted-by: Claude Code (Opus 4.8); reviewed by @moomindani before posting
|
|
||
| After ``repair_run`` is accepted, Databricks needs a moment to drop the parent run out of its | ||
| terminal state. The coordinator polls every ``workflow_repair_polling_period`` seconds and gives | ||
| Databricks up to ``workflow_repair_timeout`` seconds (default 300s / 5 minutes) to |
There was a problem hiding this comment.
The default here is described as 300s / 5 minutes, but the code default for workflow_repair_timeout is 180 (3 minutes) — see _DatabricksWorkflowRepairCoordinatorOperator.__init__ (workflow_repair_timeout: int = 180). One of them should be corrected so the docs match the code.
Drafted-by: Claude Code (Opus 4.8); reviewed by @moomindani before posting
| deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False), | ||
| **kwargs, | ||
| ): | ||
| if workflow_repair_attempts < 1: |
There was a problem hiding this comment.
The coordinator and the waiter (DatabricksWorkflowRepairWaitTrigger) coordinate only via the shared end_time + workflow_repair_timeout deadline. If the coordinator detects the failure ~one workflow_repair_polling_period late and the repair isn't reflected before the waiter's deadline, the waiter can fail its task while Databricks actually goes on to repair (and possibly succeed) the run — leaving the Airflow task FAILED but the Databricks run repaired. With the defaults (180 vs 30) there's plenty of margin, so this only bites when workflow_repair_timeout is small relative to workflow_repair_polling_period.
A hard guard feels too strong here, since the safe margin really depends on Databricks' (non-deterministic) repair-reflection latency. But it might be worth emitting a log.warning(...) at construction time when workflow_repair_timeout is small relative to workflow_repair_polling_period (e.g. < polling_period * 2), so users get a heads-up at config time without being blocked. At minimum, a comment making this implicit relationship explicit would help.
Drafted-by: Claude Code (Opus 4.8); reviewed by @moomindani before posting
Thanks, digging into these shifted how I think this should be positioned. Context on how I got here: the failures driving this were all transient (a ~5 min upstream-source outage, a library that failed to install, a Python kernel going unresponsive), and a job repair cleared each one. So that's what I reached for and what this PR builds. But with fresh eyes, perhaps native task retries may be the better tool here, and the task group exposes neither today. Repair vs. native retries. Complementary, split by cluster lifecycle. Native retries ( Interaction with Airflow Path forward. I'd like to split native retries into its own PR first and keep this one as a potential follow-up for the cases retries can't cover. Sound right, or would you rather both land together here with the positioning documented up front? Drafted-by: Claude Code (Opus 4.8); reviewed by @Beat-Nick before posting |
|
This is a great write-up — the cluster-lifecycle split (native retries = in-flight, same cluster; repair = terminal run, fresh cluster + failed/dependent tasks) is exactly the right mental model, and it matches the failures you described. I'd go with splitting: land native task retries ( Reasoning:
Two things worth folding into the native-retries PR while you're there:
That's my read as a reviewer; @eladkal may have a preference on sequencing too. Drafted-by: Claude Code (Opus 4.8); reviewed by @moomindani before posting |
|
@Beat-Nick — the Static checks CI job is failing here, which needs a code fix on your side (a rerun won't clear it). You can reproduce and fix locally with: Once those pass and CI is green, it'll be ready for a maintainer to pick up. Thanks! See the PR quality criteria. Automated first-pass triage note drafted by an AI-assisted tool — may get things wrong; once addressed, a real Apache Airflow maintainer takes the next look. (why automated) Drafted-by: Claude Code (Opus 4.8); reviewed by @potiuk before posting |
|
I think the approach in #69182 is a better fit for my use case. It's a smaller, more targeted change while also incorporating several of the ideas discussed here. With that in mind, I'll be closing this PR for now. |
Enable Databricks Workflow Repair In Airflow 3
Adds automatic Databricks Workflow repair support to
DatabricksWorkflowTaskGroupon Airflow 3.Today, a transient Databricks task failure fails the Dag. Retrying the Airflow task does not recover the Databricks Workflow run: the retried task calls the Databricks API, sees that the existing run is still failed, and fails again.
When
workflow_repair_attempts > 0,DatabricksWorkflowTaskGroupinjects arepair_coordinatortask for the workflow run. After a workflow failure, the coordinator calls Databricksrepair_runso Databricks reruns the failed tasks and their dependent tasks in the existing workflow run.Downstream
DatabricksNotebookOperatortasks find the replacement sub-run for theirtask_keyand continue watching that run.Design
launchremains responsible only for starting the workflow run. It creates or resets the Databricks job, starts the run, stores the run link, and returns{conn_id, job_id, run_id}through XCom.repair_coordinatoris injected afterlaunchwhenworkflow_repair_attempts > 0. It owns the repair for the parent Databricks run. It watches the originalrun_iduntil the run reaches a terminal state:repair_run;The coordinator is the only task that calls
repair_run, so multiple failed Databricks sub-runs cannot race each other to repair the same parent workflow run. The first repair uses the originalrun_id; later repairs also passlatest_repair_idso Databricks continues the same repair chain.DatabricksNotebookOperatormonitors its Databricks sub-run. On failure, if workflow repair is enabled, it waits in the same Airflow task attempt for Databricks to expose a newer sub-run for the sametask_keyon the parent workflow run, then monitors that attempt. If no new attempt appears within the grace window, the task fails.The coordinator and
DatabricksNotebookOperatorboth support sync and deferrable execution. The behavior is the same in both modes:Both modes share the same repair payload builder and task-attempt selection helper, so Databricks API requests and replacement-task matching stay consistent across sync and deferrable execution.
When
workflow_repair_attempts == 0, the repair coordinator is not injected and existing Databricks operator behavior is unchanged.Synchronizing repair operations
To avoid treating stale Databricks state as final, and to keep the coordinator and the waiting tasks from disagreeing about how long Databricks is allowed to take, both sides share a single wall-clock deadline:
workflow_repair_timeoutseconds (default 180s / 3 minutes) measured from the parent run's terminalend_time, the value both sides read from the same Databricks run object;repair_runis accepted, the coordinator waits until that repair is reflected in the run before it does anything else, polling everyworkflow_repair_polling_periodseconds and giving up at the shared deadline;Because both sides anchor to the same
end_timeand the same timeout, they reach the same give-up instant regardless of when each one started polling — a downstream task is never failed while the coordinator could still produce a repair.Notes
Was generative AI tooling used to co-author this PR?
Generated-by: Claude Code + internally reviewed by my team.
Important
🛠️ Maintainer triage note for @Beat-Nick · by
@potiuk· 2026-06-22 06:31 UTCHelpful heads-up from the maintainers — please address before this PR can be reviewed (see the Pull Request quality criteria):
The ball is in your court — you've been assigned to this PR. Fix the above, then mark it Ready for review.
Automated triage — may be imperfect; a maintainer takes the next look.