Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airflow-core/newsfragments/69205.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix the mapped XCom sequence-slice API returning incorrect rows for crossed-bound slices (where ``stop`` comes before ``start``). Such slices now return an empty result, matching Python slicing, instead of emitting a negative SQL ``LIMIT`` -- which silently returned wrong rows on SQLite and failed the request on PostgreSQL/MySQL.
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,22 @@ class GetXComSliceFilterParams(BaseModel):
include_prior_dates: bool = False


def _sliced_or_empty(query: Select, low: int, high: int) -> Select:
"""
Apply ``.slice(low, high)`` but return no rows when the bounds are crossed.

``Select.slice(low, high)`` compiles to ``OFFSET low LIMIT (high - low)`` and does not
clamp a negative limit. A crossed slice (``high <= low``), which Python evaluates to an
empty sequence, would otherwise emit a negative SQL ``LIMIT`` -- silently returning rows
from ``OFFSET`` onward on SQLite (negative ``LIMIT`` means "no limit") and raising on
PostgreSQL/MySQL. By this point the query is already ordered and step handling is applied
separately, so each slice is an ascending window and ``high <= low`` is exactly the empty case.
"""
if high <= low:
return query.limit(0)
return query.slice(low, high)


@router.get(
"/{dag_id}/{run_id}/{task_id}/{key:path}/slice",
description="Get XCom values from a mapped task by sequence slice",
Expand Down Expand Up @@ -235,9 +251,9 @@ def get_mapped_xcom_by_slice(
if stop < 0:
stop += get_query_count(query, session=session)
if step >= 0:
query = query.slice(start, stop)
query = _sliced_or_empty(query, start, stop)
else:
query = query.slice(stop + 1, start + 1)
query = _sliced_or_empty(query, stop + 1, start + 1)
else:
query = query.order_by(XComModel.map_index.desc())
step = -step
Expand All @@ -250,9 +266,9 @@ def get_mapped_xcom_by_slice(
if stop >= 0:
stop -= get_query_count(query, session=session)
if step > 0:
query = query.slice(-1 - start, -1 - stop)
query = _sliced_or_empty(query, -1 - start, -1 - stop)
else:
query = query.slice(-stop, -start)
query = _sliced_or_empty(query, -stop, -start)

values = [row.value for row in session.execute(query.with_only_columns(XComModel.value)).all()]
if step != 1:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,12 @@ def __init__(self, *, x, **kwargs):
pytest.param(slice(-1, None, -1), id="-1::-1"),
pytest.param(slice(-2, -1, None), id="-2:-1"),
pytest.param(slice(-1, -3, -1), id="-1:-3:-1"),
# Crossed-bound slices (stop before start) must return [] rather than
# emitting a negative SQL LIMIT; one case per .slice() branch.
pytest.param(slice(2, 1, None), id="2:1"),
pytest.param(slice(0, 1, -1), id="0:1:-1"),
pytest.param(slice(-2, -1, -1), id="-2:-1:-1"),
pytest.param(slice(-1, -2, None), id="-1:-2"),
],
)
def test_xcom_get_with_slice(self, client, dag_maker, session, key):
Expand Down
Loading