Add Databricks-native retry settings to task operators#69182
Conversation
moomindani
left a comment
There was a problem hiding this comment.
Thanks — this lands the first half of the split we discussed on #68358 (native task retries first; repair as the scoped follow-up for what retries can't reach), and it follows the cluster-lifecycle framing from that thread. The docs also address the Airflow-retries-vs-native-retries interplay that came up there, including how Airflow retries behaves differently for standalone vs workflow-group operators — good.
I validated the core semantic assumption against a live Databricks workspace, since the monitoring design depends on how native max_retries attempts surface in the Jobs API.
The design's core assumption holds. A one-time runs/submit with an always-failing task and max_retries=1 observed via runs/get polling:
t= 0s run=RUNNING | task[retry_probe] run_id=942417452203963 attempt=0 PENDING
t= 30s run=RUNNING | attempt=0 TERMINATED/FAILED | run_id=381198635536160 attempt=1 RUNNING
t= 40s run=INTERNAL_ERROR/FAILED | both attempts TERMINATED/FAILED
Each retry appears as a separate tasks[] entry under the same task_key with a new run_id and an incremented attempt_number — so re-resolving the latest attempt on each poll is the right model, and treating a failed attempt as inconclusive while the parent run is still active is justified. Also verified: the three new fields are real Jobs API task-level fields; both injection paths (standalone _get_run_json reshape and _convert_to_databricks_workflow_task) are covered; None correctly omits fields; and the BLOCKED/WAITING_FOR_RETRY life-cycle-state additions are needed (previously RunState raised on them).
One substantive improvement and two minor test gaps:
- The documented tradeoff (a failed attempt inside a workflow group waits for the whole parent run) looks avoidable — see inline; the same probe shows why.
- The workflow-path conversion test covers only operator-arg retry settings on
DatabricksNotebookOperator;task_config-supplied values and the operator-arg-over-task_configprecedence are untested for the workflow path. - The reshape-without-native-retries case (
min_retry_interval_millisalone →tasks: [...]payload but_monitor_single_attemptmonitoring) has no test confirming_get_current_databricks_taskresolves on the reshaped submit run.
With this in, the repair follow-up from #68358 stays valuable for the cases retries can't reach (fresh-cluster recovery on a terminal run) — worth keeping that scoped as discussed.
Non-binding review.
Drafted-by: Claude Code (Fable 5); reviewed by @moomindani before posting
| if run_state.is_successful: | ||
| return True | ||
| parent_state = RunState(**self._hook.get_run(workflow_run_id)["state"]) | ||
| return parent_state.is_terminal |
There was a problem hiding this comment.
This wait-for-parent gate is more conservative than it needs to be. runs/get exposes attempt_number per attempt (verified on a live workspace: attempts arrive as attempt=0, attempt=1, ... under the same task_key), and the operator knows its own max_retries (operator arg or task_config). So when the latest attempt is terminal-failed and attempt_number >= max_retries (with max_retries != -1), retries are exhausted and the failure is already conclusive — no need to keep the Airflow task waiting (or the trigger deferring) until every sibling task finishes, which with long-running siblings delays downstream failure handling by the length of the whole run. The parent-terminal fallback would then only be needed for max_retries=-1. If you take this, the "there is no per-task retries-exhausted signal" paragraph in notebook.rst/task.rst and this docstring should be updated to match.
Drafted-by: Claude Code (Fable 5); reviewed by @moomindani before posting
|
|
||
| if isinstance(max_retries, bool) or max_retries is None: | ||
| return False | ||
| if isinstance(max_retries, str): |
There was a problem hiding this comment.
This str-coercion implies templated values were anticipated, but the new params are not in template_fields, so a Jinja string would reach the API unrendered. Either add the three params to template_fields (and keep this branch) or drop the str handling — as-is it only masks a misconfiguration.
Drafted-by: Claude Code (Fable 5); reviewed by @moomindani before posting
Add Databricks-native retry settings to task operators
Summary
Adds first-class Databricks task retry settings to
DatabricksNotebookOperatorandDatabricksTaskOperator:max_retries,min_retry_interval_millis, andretry_on_timeout.These are Databricks task-level retries, not Airflow task retries. Databricks reruns the failed task attempt inside the same job run; Airflow
retriesrerun the operator.The payload shape change is gated on explicit retry configuration, so existing standalone tasks keep their current
runs/submitpayload unless users opt in by setting a Databricks retry field.This follows the recovery-model discussion in apache/airflow#68358: native task retries handle transient task failures first, while workflow repair remains separate follow-up work for run-level recovery.
Details
The retry fields live on Databricks Jobs API tasks, so the implementation sits in
DatabricksTaskBaseOperatorand applies to both standalone submits and tasks insideDatabricksWorkflowTaskGroup.For standalone
DatabricksNotebookOperatorandDatabricksTaskOperator,_get_run_json()switches to thetasks[]submit form only when a retry field is configured through operator arguments or, forDatabricksTaskOperator,task_config. This is required because Databricks ignores these fields at the top level ofruns/submit; they must be placed on aSubmitTask.Monitoring becomes retry-aware only when the effective Databricks
max_retriespermits another native attempt (-1or a positive integer). In that mode:task_keyand treat a failed attempt as final only after the parent workflow run is terminal.workflow_run_idanddatabricks_task_keytoDatabricksExecutionTrigger, soon_killcan cancel the latest retry attempt instead of a stale attempt id.Explicit settings that do not enable retries, such as
max_retries=0,retry_on_timeout=False, ormin_retry_interval_millisalone, still land in the task payload but keep existing single-attempt monitoring behavior.Changes
DatabricksNotebookOperatorandDatabricksTaskOperator.DatabricksTaskOperatorprecedence: direct operator arguments override matchingtask_configfields, and the operator-managedtask_keycannot be shadowed bytask_config.WAITING_FOR_RETRYandBLOCKEDas non-terminalRunStatelife cycle states.WAITING_FOR_RETRY.DatabricksSubmitRunOperatorandDatabricksCreateJobsOperatorremain raw payload pass-through operators; users can already set per-task retry fields in their task payloads.Was generative AI tooling used to co-author this PR?
Generated-by: Claude Code (Opus 4.8) following the guidelines