From 804d7382e74f7a40e6b8b758ee4e9938d236af7c Mon Sep 17 00:00:00 2001 From: Shahar Epstein <60007259+shahar1@users.noreply.github.com> Date: Fri, 19 Jun 2026 14:57:15 +0300 Subject: [PATCH 1/2] Stop new asset-triggered Dags from consuming historical asset events When an asset-triggered Dag is added to a deployment whose assets already have event history, its first run swept in every asset event ever recorded because the event window fell back to date.min when no previous run existed. A brand-new consumer therefore reprocessed the entire backlog on first run. Bound the first run's window at the time the Dag started scheduling on its assets (the schedule reference's created_at, which is preserved across re-parsing) so a new Dag only receives events that occurred after it began consuming the asset. Closes: #39456 --- airflow-core/newsfragments/68749.bugfix.rst | 1 + .../src/airflow/jobs/scheduler_job_runner.py | 9 +- .../tests/unit/jobs/test_scheduler_job.py | 116 ++++++++++++++---- 3 files changed, 101 insertions(+), 25 deletions(-) create mode 100644 airflow-core/newsfragments/68749.bugfix.rst diff --git a/airflow-core/newsfragments/68749.bugfix.rst b/airflow-core/newsfragments/68749.bugfix.rst new file mode 100644 index 0000000000000..8e883753d527f --- /dev/null +++ b/airflow-core/newsfragments/68749.bugfix.rst @@ -0,0 +1 @@ +New asset-triggered Dags no longer consume historical asset events recorded before the Dag started scheduling on those assets; the first run is now bounded at the moment the schedule reference was created instead of reprocessing the entire backlog (#39456). diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index 93b155f2257ac..67c3bea9ddc86 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -2592,7 +2592,14 @@ def _create_dag_runs_asset_triggered( ), ), AssetEvent.timestamp <= triggered_date, - AssetEvent.timestamp > func.coalesce(cte.c.previous_dag_run_run_after, date.min), + AssetEvent.timestamp + > func.coalesce( + cte.c.previous_dag_run_run_after, + select(func.min(DagScheduleAssetReference.created_at)) + .where(DagScheduleAssetReference.dag_id == dag.dag_id) + .scalar_subquery(), + date.min, + ), ) .order_by(AssetEvent.timestamp.asc(), AssetEvent.id.asc()) ) diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index b312f9fd1b0a8..1ad0e314d9717 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -65,6 +65,7 @@ AssetEvent, AssetModel, AssetPartitionDagRun, + DagScheduleAssetReference, PartitionedAssetKeyLog, ) from airflow.models.backfill import Backfill, BackfillDagRun, ReprocessBehavior, _create_backfill @@ -5501,51 +5502,59 @@ def test_create_dag_runs_assets(self, session, dag_maker): with dag_maker(dag_id="assets-1", start_date=timezone.utcnow(), session=session): BashOperator(task_id="task", bash_command="echo 1", outlets=[asset1]) - dr = dag_maker.create_dagrun( + dr1 = dag_maker.create_dagrun( run_id="run1", logical_date=(DEFAULT_DATE + timedelta(days=100)), data_interval=(DEFAULT_DATE + timedelta(days=10), DEFAULT_DATE + timedelta(days=11)), ) + dr2 = dag_maker.create_dagrun( + run_id="run2", + logical_date=(DEFAULT_DATE + timedelta(days=101)), + data_interval=(DEFAULT_DATE + timedelta(days=5), DEFAULT_DATE + timedelta(days=6)), + ) asset1_id = session.scalar(select(AssetModel.id).where(AssetModel.uri == asset1.uri)) + # Consumer Dags are created before the events, so the events fall within their window. + with dag_maker(dag_id="assets-consumer-multiple", schedule=[asset1, asset2]): + pass + dag2 = dag_maker.dag + with dag_maker(dag_id="assets-consumer-single", schedule=[asset1]): + pass + dag3 = dag_maker.dag + + base = session.scalar( + select(DagScheduleAssetReference.created_at).where( + DagScheduleAssetReference.dag_id == dag3.dag_id + ) + ) event1 = AssetEvent( asset_id=asset1_id, source_task_id="task", - source_dag_id=dr.dag_id, - source_run_id=dr.run_id, + source_dag_id=dr1.dag_id, + source_run_id=dr1.run_id, source_map_index=-1, + timestamp=base + timedelta(seconds=1), ) - session.add(event1) - - # Create a second event, creation time is more recent, but data interval is older - dr = dag_maker.create_dagrun( - run_id="run2", - logical_date=(DEFAULT_DATE + timedelta(days=101)), - data_interval=(DEFAULT_DATE + timedelta(days=5), DEFAULT_DATE + timedelta(days=6)), - ) - event2 = AssetEvent( asset_id=asset1_id, source_task_id="task", - source_dag_id=dr.dag_id, - source_run_id=dr.run_id, + source_dag_id=dr2.dag_id, + source_run_id=dr2.run_id, source_map_index=-1, + timestamp=base + timedelta(seconds=2), ) - session.add(event2) - - with dag_maker(dag_id="assets-consumer-multiple", schedule=[asset1, asset2]): - pass - dag2 = dag_maker.dag - with dag_maker(dag_id="assets-consumer-single", schedule=[asset1]): - pass - dag3 = dag_maker.dag + session.add_all([event1, event2]) session = dag_maker.session session.add_all( [ - AssetDagRunQueue(asset_id=asset1_id, target_dag_id=dag2.dag_id), - AssetDagRunQueue(asset_id=asset1_id, target_dag_id=dag3.dag_id), + AssetDagRunQueue( + asset_id=asset1_id, target_dag_id=dag2.dag_id, created_at=base + timedelta(hours=1) + ), + AssetDagRunQueue( + asset_id=asset1_id, target_dag_id=dag3.dag_id, created_at=base + timedelta(hours=1) + ), ] ) session.flush() @@ -5591,6 +5600,65 @@ def dict_from_obj(obj): assert created_run.creating_job_id == scheduler_job.id + @pytest.mark.need_serialized_dag + def test_new_asset_triggered_dag_ignores_events_before_creation(self, session, dag_maker): + """A newly added asset-triggered Dag must not consume events that predate it. + + Reproduces issue #39456: a new consumer Dag scheduled on an asset that already has + historical events should only receive events that occurred after it started + consuming the asset, not the entire backlog since the first event. + """ + asset = Asset(uri="test://asset-historical", name="hist_asset", group="test_group") + + # Producer Dag + run that the asset events are sourced from. + with dag_maker(dag_id="historical-producer", start_date=timezone.utcnow(), session=session): + BashOperator(task_id="task", bash_command="echo 1", outlets=[asset]) + producer_run = dag_maker.create_dagrun(run_id="producer-run") + + asset_id = session.scalar(select(AssetModel.id).where(AssetModel.uri == asset.uri)) + + # Consumer Dag created now; its schedule reference's created_at is the cut-off. + with dag_maker(dag_id="historical-consumer", schedule=[asset]): + pass + consumer_dag = dag_maker.dag + reference_created_at = session.scalar( + select(DagScheduleAssetReference.created_at).where( + DagScheduleAssetReference.dag_id == consumer_dag.dag_id + ) + ) + + def _make_event(timestamp): + return AssetEvent( + asset_id=asset_id, + source_task_id="task", + source_dag_id=producer_run.dag_id, + source_run_id=producer_run.run_id, + source_map_index=-1, + timestamp=timestamp, + ) + + old_event = _make_event(reference_created_at - timedelta(days=1)) + new_event = _make_event(reference_created_at + timedelta(seconds=1)) + session.add_all([old_event, new_event]) + # Trigger time after both events so neither is excluded by the upper bound. + session.add( + AssetDagRunQueue( + asset_id=asset_id, + target_dag_id=consumer_dag.dag_id, + created_at=reference_created_at + timedelta(hours=1), + ) + ) + session.flush() + + scheduler_job = Job() + self.job_runner = SchedulerJobRunner(job=scheduler_job, executors=[self.null_exec]) + with create_session() as session: + self.job_runner._create_dagruns_for_dags(session, session) + + created_run = session.scalars(select(DagRun).where(DagRun.dag_id == consumer_dag.dag_id)).one() + assert created_run.state == State.QUEUED + assert {e.id for e in created_run.consumed_asset_events} == {new_event.id} + @pytest.mark.need_serialized_dag def test_create_dag_runs_asset_alias_with_asset_event_attached(self, session, dag_maker): """ From dd61521012e3c26490b25576799bdb03f654aa8b Mon Sep 17 00:00:00 2001 From: Shahar Epstein <60007259+shahar1@users.noreply.github.com> Date: Tue, 30 Jun 2026 23:38:48 +0300 Subject: [PATCH 2/2] Gate asset-triggered backlog replay behind the catchup flag MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bounding the first-run event window unconditionally removed any way for a newly added asset-triggered Dag to intentionally replay an asset's full history. Reusing the existing catchup flag — which already means "consider history when the Dag is newly added" — restores that choice: catchup off (the default) skips the backlog, catchup on replays it. --- airflow-core/newsfragments/68749.bugfix.rst | 2 +- .../src/airflow/jobs/scheduler_job_runner.py | 21 ++++++++++------ .../tests/unit/jobs/test_scheduler_job.py | 25 ++++++++++++------- 3 files changed, 30 insertions(+), 18 deletions(-) diff --git a/airflow-core/newsfragments/68749.bugfix.rst b/airflow-core/newsfragments/68749.bugfix.rst index 8e883753d527f..592b875b2f54b 100644 --- a/airflow-core/newsfragments/68749.bugfix.rst +++ b/airflow-core/newsfragments/68749.bugfix.rst @@ -1 +1 @@ -New asset-triggered Dags no longer consume historical asset events recorded before the Dag started scheduling on those assets; the first run is now bounded at the moment the schedule reference was created instead of reprocessing the entire backlog (#39456). +Asset-triggered Dags now honor ``catchup`` for historical asset events. With ``catchup`` off (the default), a newly added asset-triggered Dag no longer consumes events recorded before it started scheduling on those assets; its first run is bounded at the moment the schedule reference was created instead of reprocessing the entire backlog. With ``catchup`` on, the backlog is still replayed (#39456). diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index 67c3bea9ddc86..6c36b2b6c1352 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -2575,6 +2575,18 @@ def _create_dag_runs_asset_triggered( .cte() ) + # A first asset-triggered run has no previous run to floor the event window. With + # catchup off, floor it at when the Dag started scheduling on its assets so the + # backlog is skipped; with catchup on, only date.min applies and the backlog replays. + event_window_floor: list[Any] = [cte.c.previous_dag_run_run_after] + if not dag.catchup: + event_window_floor.append( + select(func.min(DagScheduleAssetReference.created_at)) + .where(DagScheduleAssetReference.dag_id == dag.dag_id) + .scalar_subquery() + ) + event_window_floor.append(date.min) + asset_events = list( session.scalars( select(AssetEvent) @@ -2592,14 +2604,7 @@ def _create_dag_runs_asset_triggered( ), ), AssetEvent.timestamp <= triggered_date, - AssetEvent.timestamp - > func.coalesce( - cte.c.previous_dag_run_run_after, - select(func.min(DagScheduleAssetReference.created_at)) - .where(DagScheduleAssetReference.dag_id == dag.dag_id) - .scalar_subquery(), - date.min, - ), + AssetEvent.timestamp > func.coalesce(*event_window_floor), ) .order_by(AssetEvent.timestamp.asc(), AssetEvent.id.asc()) ) diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index 1ad0e314d9717..eb864c6091432 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -5601,13 +5601,19 @@ def dict_from_obj(obj): assert created_run.creating_job_id == scheduler_job.id @pytest.mark.need_serialized_dag - def test_new_asset_triggered_dag_ignores_events_before_creation(self, session, dag_maker): - """A newly added asset-triggered Dag must not consume events that predate it. - - Reproduces issue #39456: a new consumer Dag scheduled on an asset that already has - historical events should only receive events that occurred after it started - consuming the asset, not the entire backlog since the first event. - """ + @pytest.mark.parametrize( + ("catchup", "expects_old_event"), + [ + pytest.param(False, False, id="catchup-off-ignores-backlog"), + pytest.param(True, True, id="catchup-on-consumes-backlog"), + ], + ) + def test_new_asset_triggered_dag_backlog_gated_by_catchup( + self, catchup, expects_old_event, session, dag_maker + ): + """Reproduces #39456: catchup gates whether a new asset-triggered Dag replays the + pre-creation backlog. With catchup off (the default) it only consumes events after it + started scheduling on the asset; with catchup on it replays the full history.""" asset = Asset(uri="test://asset-historical", name="hist_asset", group="test_group") # Producer Dag + run that the asset events are sourced from. @@ -5618,7 +5624,7 @@ def test_new_asset_triggered_dag_ignores_events_before_creation(self, session, d asset_id = session.scalar(select(AssetModel.id).where(AssetModel.uri == asset.uri)) # Consumer Dag created now; its schedule reference's created_at is the cut-off. - with dag_maker(dag_id="historical-consumer", schedule=[asset]): + with dag_maker(dag_id="historical-consumer", schedule=[asset], catchup=catchup): pass consumer_dag = dag_maker.dag reference_created_at = session.scalar( @@ -5657,7 +5663,8 @@ def _make_event(timestamp): created_run = session.scalars(select(DagRun).where(DagRun.dag_id == consumer_dag.dag_id)).one() assert created_run.state == State.QUEUED - assert {e.id for e in created_run.consumed_asset_events} == {new_event.id} + expected = {new_event.id} | ({old_event.id} if expects_old_event else set()) + assert {e.id for e in created_run.consumed_asset_events} == expected @pytest.mark.need_serialized_dag def test_create_dag_runs_asset_alias_with_asset_event_attached(self, session, dag_maker):