Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airflow-core/newsfragments/68749.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Asset-triggered Dags now honor ``catchup`` for historical asset events. With ``catchup`` off (the default), a newly added asset-triggered Dag no longer consumes events recorded before it started scheduling on those assets; its first run is bounded at the moment the schedule reference was created instead of reprocessing the entire backlog. With ``catchup`` on, the backlog is still replayed (#39456).
14 changes: 13 additions & 1 deletion airflow-core/src/airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2575,6 +2575,18 @@ def _create_dag_runs_asset_triggered(
.cte()
)

# A first asset-triggered run has no previous run to floor the event window. With
# catchup off, floor it at when the Dag started scheduling on its assets so the
# backlog is skipped; with catchup on, only date.min applies and the backlog replays.
event_window_floor: list[Any] = [cte.c.previous_dag_run_run_after]
if not dag.catchup:
event_window_floor.append(
select(func.min(DagScheduleAssetReference.created_at))
.where(DagScheduleAssetReference.dag_id == dag.dag_id)
.scalar_subquery()
)
event_window_floor.append(date.min)

asset_events = list(
session.scalars(
select(AssetEvent)
Expand All @@ -2592,7 +2604,7 @@ def _create_dag_runs_asset_triggered(
),
),
AssetEvent.timestamp <= triggered_date,
AssetEvent.timestamp > func.coalesce(cte.c.previous_dag_run_run_after, date.min),
AssetEvent.timestamp > func.coalesce(*event_window_floor),
)
.order_by(AssetEvent.timestamp.asc(), AssetEvent.id.asc())
)
Expand Down
123 changes: 99 additions & 24 deletions airflow-core/tests/unit/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
AssetEvent,
AssetModel,
AssetPartitionDagRun,
DagScheduleAssetReference,
PartitionedAssetKeyLog,
)
from airflow.models.backfill import Backfill, BackfillDagRun, ReprocessBehavior, _create_backfill
Expand Down Expand Up @@ -5501,51 +5502,59 @@ def test_create_dag_runs_assets(self, session, dag_maker):

with dag_maker(dag_id="assets-1", start_date=timezone.utcnow(), session=session):
BashOperator(task_id="task", bash_command="echo 1", outlets=[asset1])
dr = dag_maker.create_dagrun(
dr1 = dag_maker.create_dagrun(
run_id="run1",
logical_date=(DEFAULT_DATE + timedelta(days=100)),
data_interval=(DEFAULT_DATE + timedelta(days=10), DEFAULT_DATE + timedelta(days=11)),
)
dr2 = dag_maker.create_dagrun(
run_id="run2",
logical_date=(DEFAULT_DATE + timedelta(days=101)),
data_interval=(DEFAULT_DATE + timedelta(days=5), DEFAULT_DATE + timedelta(days=6)),
)

asset1_id = session.scalar(select(AssetModel.id).where(AssetModel.uri == asset1.uri))

# Consumer Dags are created before the events, so the events fall within their window.
with dag_maker(dag_id="assets-consumer-multiple", schedule=[asset1, asset2]):
pass
dag2 = dag_maker.dag
with dag_maker(dag_id="assets-consumer-single", schedule=[asset1]):
pass
dag3 = dag_maker.dag

base = session.scalar(
select(DagScheduleAssetReference.created_at).where(
DagScheduleAssetReference.dag_id == dag3.dag_id
)
)
event1 = AssetEvent(
asset_id=asset1_id,
source_task_id="task",
source_dag_id=dr.dag_id,
source_run_id=dr.run_id,
source_dag_id=dr1.dag_id,
source_run_id=dr1.run_id,
source_map_index=-1,
timestamp=base + timedelta(seconds=1),
)
session.add(event1)

# Create a second event, creation time is more recent, but data interval is older
dr = dag_maker.create_dagrun(
run_id="run2",
logical_date=(DEFAULT_DATE + timedelta(days=101)),
data_interval=(DEFAULT_DATE + timedelta(days=5), DEFAULT_DATE + timedelta(days=6)),
)

event2 = AssetEvent(
asset_id=asset1_id,
source_task_id="task",
source_dag_id=dr.dag_id,
source_run_id=dr.run_id,
source_dag_id=dr2.dag_id,
source_run_id=dr2.run_id,
source_map_index=-1,
timestamp=base + timedelta(seconds=2),
)
session.add(event2)

with dag_maker(dag_id="assets-consumer-multiple", schedule=[asset1, asset2]):
pass
dag2 = dag_maker.dag
with dag_maker(dag_id="assets-consumer-single", schedule=[asset1]):
pass
dag3 = dag_maker.dag
session.add_all([event1, event2])

session = dag_maker.session
session.add_all(
[
AssetDagRunQueue(asset_id=asset1_id, target_dag_id=dag2.dag_id),
AssetDagRunQueue(asset_id=asset1_id, target_dag_id=dag3.dag_id),
AssetDagRunQueue(
asset_id=asset1_id, target_dag_id=dag2.dag_id, created_at=base + timedelta(hours=1)
),
AssetDagRunQueue(
asset_id=asset1_id, target_dag_id=dag3.dag_id, created_at=base + timedelta(hours=1)
),
]
)
session.flush()
Expand Down Expand Up @@ -5591,6 +5600,72 @@ def dict_from_obj(obj):

assert created_run.creating_job_id == scheduler_job.id

@pytest.mark.need_serialized_dag
@pytest.mark.parametrize(
("catchup", "expects_old_event"),
[
pytest.param(False, False, id="catchup-off-ignores-backlog"),
pytest.param(True, True, id="catchup-on-consumes-backlog"),
],
)
def test_new_asset_triggered_dag_backlog_gated_by_catchup(
self, catchup, expects_old_event, session, dag_maker
):
"""Reproduces #39456: catchup gates whether a new asset-triggered Dag replays the
pre-creation backlog. With catchup off (the default) it only consumes events after it
started scheduling on the asset; with catchup on it replays the full history."""
asset = Asset(uri="test://asset-historical", name="hist_asset", group="test_group")

# Producer Dag + run that the asset events are sourced from.
with dag_maker(dag_id="historical-producer", start_date=timezone.utcnow(), session=session):
BashOperator(task_id="task", bash_command="echo 1", outlets=[asset])
producer_run = dag_maker.create_dagrun(run_id="producer-run")

asset_id = session.scalar(select(AssetModel.id).where(AssetModel.uri == asset.uri))

# Consumer Dag created now; its schedule reference's created_at is the cut-off.
with dag_maker(dag_id="historical-consumer", schedule=[asset], catchup=catchup):
pass
consumer_dag = dag_maker.dag
reference_created_at = session.scalar(
select(DagScheduleAssetReference.created_at).where(
DagScheduleAssetReference.dag_id == consumer_dag.dag_id
)
)

def _make_event(timestamp):
return AssetEvent(
asset_id=asset_id,
source_task_id="task",
source_dag_id=producer_run.dag_id,
source_run_id=producer_run.run_id,
source_map_index=-1,
timestamp=timestamp,
)

old_event = _make_event(reference_created_at - timedelta(days=1))
new_event = _make_event(reference_created_at + timedelta(seconds=1))
session.add_all([old_event, new_event])
# Trigger time after both events so neither is excluded by the upper bound.
session.add(
AssetDagRunQueue(
asset_id=asset_id,
target_dag_id=consumer_dag.dag_id,
created_at=reference_created_at + timedelta(hours=1),
)
)
session.flush()

scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, executors=[self.null_exec])
with create_session() as session:
self.job_runner._create_dagruns_for_dags(session, session)

created_run = session.scalars(select(DagRun).where(DagRun.dag_id == consumer_dag.dag_id)).one()
assert created_run.state == State.QUEUED
expected = {new_event.id} | ({old_event.id} if expects_old_event else set())
assert {e.id for e in created_run.consumed_asset_events} == expected

@pytest.mark.need_serialized_dag
def test_create_dag_runs_asset_alias_with_asset_event_attached(self, session, dag_maker):
"""
Expand Down
Loading