Skip to content

Honor catchup for historical asset events in asset-triggered Dags#68749

Merged
shahar1 merged 2 commits into
apache:mainfrom
shahar1:fix-new-asset-dag-historical-events
Jul 1, 2026
Merged

Honor catchup for historical asset events in asset-triggered Dags#68749
shahar1 merged 2 commits into
apache:mainfrom
shahar1:fix-new-asset-dag-historical-events

Conversation

@shahar1

@shahar1 shahar1 commented Jun 19, 2026

Copy link
Copy Markdown
Contributor

Human Summary

closes: #39456
related: #39603

Asset-triggered Dags now honor catchup for historical asset events. With catchup=False (the default), a Dag newly added to assets that already have event history no longer replays the backlog on its first run - the event window starts when the Dag began scheduling on those assets. With catchup=True, the backlog is still consumed.

AI Summary

Click here

Bug. In SchedulerJobRunner._create_dag_runs_asset_triggered, the set of asset events attached to a run is bounded below by the previous asset-triggered run's run_after, falling back to date.min when there is no previous run. For a Dag's very first run this means every asset event ever recorded for its assets is consumed — so a Dag newly added to assets that already have history reprocesses the entire backlog on its first run, unconditionally.

Fix. Gate the backlog behind the Dag's catchup flag (as discussed in the PR thread — catchup already expresses "should history be considered when the Dag is newly added"). When catchup is off, the Dag's earliest schedule-reference created_at is inserted into the coalesce fallback chain before date.min:

event_window_floor: list[Any] = [cte.c.previous_dag_run_run_after]
if not dag.catchup:
    event_window_floor.append(
        select(func.min(DagScheduleAssetReference.created_at))
        .where(DagScheduleAssetReference.dag_id == dag.dag_id)
        .scalar_subquery()
    )
event_window_floor.append(date.min)

So with catchup=False the first run's window is floored at the moment the Dag started scheduling on its assets; with catchup=True the floor stays date.min and the full history replays, matching catchup semantics for time-scheduled Dags. Subsequent runs are unchanged either way (still bounded by the previous run's run_after).

created_at is a reliable cut-off because schedule references are updated in place during parsing (dag_processing/collection.py), not deleted and recreated, so it survives re-serialization.

Scope. Direct asset references only. The asset-alias path is deliberately left on date.min: an alias consumer is expected to pick up events attached to its alias regardless of timing (covered by test_create_dag_runs_asset_alias_with_asset_event_attached).

This is the same fix idea as the stale 2.x PR #39603, re-expressed for the current SQL-native query (a scalar subquery over a single coalesce, avoiding an implicit cartesian join) and now gated by catchup.

Validation.

  • New parametrized regression test test_new_asset_triggered_dag_backlog_gated_by_catchup covering both sides: catchup=False ignores the pre-creation backlog (fails on main), catchup=True consumes it.
  • Updated test_create_dag_runs_assets, which previously encoded the buggy behavior.
  • Newsfragment airflow-core/newsfragments/68749.bugfix.rst describing the behavior change.

Was generative AI tooling used to co-author this PR?
  • Yes — Claude Code (Opus 4.8, 1M context)

Generated-by: Claude Code (Opus 4.8, 1M context) following the guidelines

@shahar1 shahar1 requested review from XD-DENG and ashb as code owners June 19, 2026 11:58
@boring-cyborg boring-cyborg Bot added the area:Scheduler including HA (high availability) scheduler label Jun 19, 2026
@shahar1 shahar1 force-pushed the fix-new-asset-dag-historical-events branch from 3e0da08 to 7dd9d3c Compare June 19, 2026 11:59
@shahar1 shahar1 requested review from kaxil and uranusjr June 19, 2026 12:01

@vincbeck vincbeck left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise LGTM

Comment thread airflow-core/newsfragments/68749.significant.rst Outdated
When an asset-triggered Dag is added to a deployment whose assets already
have event history, its first run swept in every asset event ever recorded
because the event window fell back to date.min when no previous run existed.
A brand-new consumer therefore reprocessed the entire backlog on first run.

Bound the first run's window at the time the Dag started scheduling on its
assets (the schedule reference's created_at, which is preserved across
re-parsing) so a new Dag only receives events that occurred after it began
consuming the asset.

Closes: apache#39456
@shahar1 shahar1 force-pushed the fix-new-asset-dag-historical-events branch from 7dd9d3c to 804d738 Compare June 19, 2026 14:33
@shahar1 shahar1 requested a review from vincbeck June 19, 2026 14:34
@uranusjr

Copy link
Copy Markdown
Member

I wonder if the dag’s catchup should have a say here. The flag essentially means whether history should be considered when the dag is newly added, isn’t it?

@shahar1

shahar1 commented Jun 22, 2026

Copy link
Copy Markdown
Contributor Author

I wonder if the dag’s catchup should have a say here. The flag essentially means whether history should be considered when the dag is newly added, isn’t it?

If we want to allow such a feature, I wouldn't overload the existing catchup flag for that, because the semantics are different - by catchup you refer to complete intervals between start_date and now, which is inherently different than generally timeless assets.
Personally I don't find it too useful either. An asset refers to the current state of a dataset, not a snapshot in time - so there's nothing to replay when "catching up": re-running for a 3-month-old event would just read today's data anyway, so it's no different from a normal run now.

@uranusjr

Copy link
Copy Markdown
Member

re-running for a 3-month-old event would just read today's data anyway

This entirely depends on how the task actually reads data, doesn’t it? If it just reads the current data, the same argument can be made against scheduled catchups too.

For catchup to work, you need to implement the task in a way that makes catchup possible in the first place. With scheduled dags, this means it should use the data interval or logical date. With assets, it should use the asset event date, which would be in the past. I don’t think this argument holds.

@shahar1

shahar1 commented Jun 23, 2026

Copy link
Copy Markdown
Contributor Author

re-running for a 3-month-old event would just read today's data anyway

This entirely depends on how the task actually reads data, doesn’t it? If it just reads the current data, the same argument can be made against scheduled catchups too.

For catchup to work, you need to implement the task in a way that makes catchup possible in the first place. With scheduled dags, this means it should use the data interval or logical date. With assets, it should use the asset event date, which would be in the past. I don’t think this argument holds.

Gotcha, it makes sense - I've probably overstated.
Assuming that it's a valid case and we gate this behavior with a flag:

  1. Would we rather use the existing catchup flag or create a new one? I tend towards a new flag (with a different name, like consume_asset_history), because of the different semantics and to retain compatibility with current behavior (see next).
  2. What should be the default behavior, now and in the future? Considering that it's a technically valid use case, I tend towards defaulting it to True to avoid immediate breaking changes (with a future warning when unset), but flip it in Airflow 3.4+ / 4.0+ for parity with the catchup's default.

WDYT? I'm fine with putting the above for a discussion and lazy consensus in the dev list.

@uranusjr

Copy link
Copy Markdown
Member

I actually think we can just use catchup because

  1. If we introduce a new argument for asset-triggered dags, the argument would not be useful for time-scheduled dags. Conversely, catchup would not make sense for asset-triggered dags. This seems wasteful when catchup adequately describes the behavior we want for asset-triggering.
  2. I do not think the current behavior (basically unconditionally setting catchup=True for all asset-triggered dags) is useful for anyone for compatibility to be a requirement. Making this configurable with the catchup argument can arguably be considered a bug-fix. Since catchup only matters for a dag newly introduced to the scheduler, effects on existing dags is minimal. The only case this can cause breakage is if a dag with catchup=False is currently paused, and then unpaused after upgrade—this would make the dag ignore the asset events during the paused period. But this is arguably the correct behavior anyway.

Bounding the first-run event window unconditionally removed any way for a
newly added asset-triggered Dag to intentionally replay an asset's full
history. Reusing the existing catchup flag — which already means "consider
history when the Dag is newly added" — restores that choice: catchup off
(the default) skips the backlog, catchup on replays it.
@shahar1 shahar1 changed the title Stop new asset-triggered Dags from consuming historical asset events Honor catchup for historical asset events in asset-triggered Dags Jul 1, 2026
@shahar1 shahar1 added the backport-to-v3-3-test Backport to v3-3-test label Jul 1, 2026
@shahar1 shahar1 merged commit ff10b2e into apache:main Jul 1, 2026
77 checks passed
@github-actions github-actions Bot added this to the Airflow 3.3.1 milestone Jul 1, 2026
@github-actions

github-actions Bot commented Jul 1, 2026

Copy link
Copy Markdown
Contributor

Hi maintainer, this PR was merged without a milestone set.
We've automatically set the milestone to Airflow 3.3.1 based on: backport label targeting v3-3-test
If this milestone is not correct, please update it to the appropriate milestone.

This comment was generated by Milestone Tag Assistant.

@shahar1 shahar1 deleted the fix-new-asset-dag-historical-events branch July 1, 2026 21:26
@github-actions

github-actions Bot commented Jul 1, 2026

Copy link
Copy Markdown
Contributor

Backport successfully created: v3-3-test

Note: As of Merging PRs targeted for Airflow 3.X
the committer who merges the PR is responsible for backporting the PRs that are bug fixes (generally speaking) to the maintenance branches.

In matter of doubt please ask in #release-management Slack channel.

Status Branch Result
v3-3-test PR Link

vatsrahul1001 pushed a commit that referenced this pull request Jul 3, 2026
…red Dags (#68749)

(cherry picked from commit ff10b2e)

Co-authored-by: Shahar Epstein <60007259+shahar1@users.noreply.github.com>
potiuk added a commit that referenced this pull request Jul 3, 2026
…red Dags (#68749) (#69224)

(cherry picked from commit ff10b2e)

Co-authored-by: Shahar Epstein <60007259+shahar1@users.noreply.github.com>
Co-authored-by: Jarek Potiuk <jarek@potiuk.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:Scheduler including HA (high availability) scheduler backport-to-v3-3-test Backport to v3-3-test

Projects

None yet

Development

Successfully merging this pull request may close these issues.

DAGs are able to see historical dataset events when created new

3 participants