Skip to content

Add Databricks-native retry settings to task operators#69182

Open
Beat-Nick wants to merge 1 commit into
apache:mainfrom
Beat-Nick:databricks-expose-task-repair-params
Open

Add Databricks-native retry settings to task operators#69182
Beat-Nick wants to merge 1 commit into
apache:mainfrom
Beat-Nick:databricks-expose-task-repair-params

Conversation

@Beat-Nick

Copy link
Copy Markdown

Add Databricks-native retry settings to task operators

Summary

Adds first-class Databricks task retry settings to DatabricksNotebookOperator and DatabricksTaskOperator: max_retries, min_retry_interval_millis, and retry_on_timeout.

These are Databricks task-level retries, not Airflow task retries. Databricks reruns the failed task attempt inside the same job run; Airflow retries rerun the operator.

The payload shape change is gated on explicit retry configuration, so existing standalone tasks keep their current runs/submit payload unless users opt in by setting a Databricks retry field.

This follows the recovery-model discussion in apache/airflow#68358: native task retries handle transient task failures first, while workflow repair remains separate follow-up work for run-level recovery.

Details

The retry fields live on Databricks Jobs API tasks, so the implementation sits in DatabricksTaskBaseOperator and applies to both standalone submits and tasks inside DatabricksWorkflowTaskGroup.

For standalone DatabricksNotebookOperator and DatabricksTaskOperator, _get_run_json() switches to the tasks[] submit form only when a retry field is configured through operator arguments or, for DatabricksTaskOperator, task_config. This is required because Databricks ignores these fields at the top level of runs/submit; they must be placed on a SubmitTask.

Monitoring becomes retry-aware only when the effective Databricks max_retries permits another native attempt (-1 or a positive integer). In that mode:

  • Standalone operators wait on the submit run, whose terminal state includes all Databricks retry attempts.
  • Workflow task operators re-resolve the latest attempt for the same task_key and treat a failed attempt as final only after the parent workflow run is terminal.
  • Deferrable workflow monitoring passes workflow_run_id and databricks_task_key to DatabricksExecutionTrigger, so on_kill can cancel the latest retry attempt instead of a stale attempt id.

Explicit settings that do not enable retries, such as max_retries=0, retry_on_timeout=False, or min_retry_interval_millis alone, still land in the task payload but keep existing single-attempt monitoring behavior.

Changes

  • Adds retry settings to DatabricksNotebookOperator and DatabricksTaskOperator.
  • Preserves DatabricksTaskOperator precedence: direct operator arguments override matching task_config fields, and the operator-managed task_key cannot be shadowed by task_config.
  • Updates sync and deferrable monitoring to wait for the final Databricks retry outcome.
  • Accepts WAITING_FOR_RETRY and BLOCKED as non-terminal RunState life cycle states.
  • Adds tests for payload generation, argument precedence, sync and deferrable monitoring, trigger serialization, and waiting through WAITING_FOR_RETRY.

DatabricksSubmitRunOperator and DatabricksCreateJobsOperator remain raw payload pass-through operators; users can already set per-task retry fields in their task payloads.

Was generative AI tooling used to co-author this PR?
  • Yes - Claude Code (Opus 4.8)

Generated-by: Claude Code (Opus 4.8) following the guidelines

@moomindani moomindani left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks — this lands the first half of the split we discussed on #68358 (native task retries first; repair as the scoped follow-up for what retries can't reach), and it follows the cluster-lifecycle framing from that thread. The docs also address the Airflow-retries-vs-native-retries interplay that came up there, including how Airflow retries behaves differently for standalone vs workflow-group operators — good.

I validated the core semantic assumption against a live Databricks workspace, since the monitoring design depends on how native max_retries attempts surface in the Jobs API.

The design's core assumption holds. A one-time runs/submit with an always-failing task and max_retries=1 observed via runs/get polling:

t=  0s  run=RUNNING | task[retry_probe] run_id=942417452203963 attempt=0 PENDING
t= 30s  run=RUNNING | attempt=0 TERMINATED/FAILED | run_id=381198635536160 attempt=1 RUNNING
t= 40s  run=INTERNAL_ERROR/FAILED | both attempts TERMINATED/FAILED

Each retry appears as a separate tasks[] entry under the same task_key with a new run_id and an incremented attempt_number — so re-resolving the latest attempt on each poll is the right model, and treating a failed attempt as inconclusive while the parent run is still active is justified. Also verified: the three new fields are real Jobs API task-level fields; both injection paths (standalone _get_run_json reshape and _convert_to_databricks_workflow_task) are covered; None correctly omits fields; and the BLOCKED/WAITING_FOR_RETRY life-cycle-state additions are needed (previously RunState raised on them).

One substantive improvement and two minor test gaps:

  • The documented tradeoff (a failed attempt inside a workflow group waits for the whole parent run) looks avoidable — see inline; the same probe shows why.
  • The workflow-path conversion test covers only operator-arg retry settings on DatabricksNotebookOperator; task_config-supplied values and the operator-arg-over-task_config precedence are untested for the workflow path.
  • The reshape-without-native-retries case (min_retry_interval_millis alone → tasks: [...] payload but _monitor_single_attempt monitoring) has no test confirming _get_current_databricks_task resolves on the reshaped submit run.

With this in, the repair follow-up from #68358 stays valuable for the cases retries can't reach (fresh-cluster recovery on a terminal run) — worth keeping that scoped as discussed.

Non-binding review.


Drafted-by: Claude Code (Fable 5); reviewed by @moomindani before posting

if run_state.is_successful:
return True
parent_state = RunState(**self._hook.get_run(workflow_run_id)["state"])
return parent_state.is_terminal

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This wait-for-parent gate is more conservative than it needs to be. runs/get exposes attempt_number per attempt (verified on a live workspace: attempts arrive as attempt=0, attempt=1, ... under the same task_key), and the operator knows its own max_retries (operator arg or task_config). So when the latest attempt is terminal-failed and attempt_number >= max_retries (with max_retries != -1), retries are exhausted and the failure is already conclusive — no need to keep the Airflow task waiting (or the trigger deferring) until every sibling task finishes, which with long-running siblings delays downstream failure handling by the length of the whole run. The parent-terminal fallback would then only be needed for max_retries=-1. If you take this, the "there is no per-task retries-exhausted signal" paragraph in notebook.rst/task.rst and this docstring should be updated to match.


Drafted-by: Claude Code (Fable 5); reviewed by @moomindani before posting


if isinstance(max_retries, bool) or max_retries is None:
return False
if isinstance(max_retries, str):

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This str-coercion implies templated values were anticipated, but the new params are not in template_fields, so a Jinja string would reach the API unrendered. Either add the three params to template_fields (and keep this branch) or drop the str handling — as-is it only masks a misconfiguration.


Drafted-by: Claude Code (Fable 5); reviewed by @moomindani before posting

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants