From 689377183a4c93937bb127808e8e9635487b7c7c Mon Sep 17 00:00:00 2001 From: Henry Chen Date: Thu, 2 Jul 2026 02:52:36 +0800 Subject: [PATCH] [airflow-ctl/v0-1-test] Add airflowctl dags state command (#68564) (cherry picked from commit ecd6fd4ad24a88c8831c8a802d0ed020a64b5f44) Co-authored-by: Henry Chen --- .../test_airflowctl_commands.py | 4 +- airflow-ctl/docs/images/command_hashes.txt | 2 +- airflow-ctl/docs/images/output_dags.svg | 130 +++++++++--------- airflow-ctl/src/airflowctl/api/client.py | 3 +- airflow-ctl/src/airflowctl/api/operations.py | 21 ++- airflow-ctl/src/airflowctl/ctl/cli_config.py | 14 ++ .../airflowctl/ctl/commands/dag_command.py | 57 +++++++- .../src/airflowctl/ctl/help_texts.yaml | 1 + .../tests/airflow_ctl/api/test_client.py | 30 +++- .../tests/airflow_ctl/api/test_operations.py | 36 +++++ .../ctl/commands/test_dag_command.py | 117 ++++++++++++++++ docs/spelling_wordlist.txt | 2 + 12 files changed, 347 insertions(+), 70 deletions(-) diff --git a/airflow-ctl-tests/tests/airflowctl_tests/test_airflowctl_commands.py b/airflow-ctl-tests/tests/airflowctl_tests/test_airflowctl_commands.py index e1cfc665804d9..2d14b1d66f5ae 100644 --- a/airflow-ctl-tests/tests/airflowctl_tests/test_airflowctl_commands.py +++ b/airflow-ctl-tests/tests/airflowctl_tests/test_airflowctl_commands.py @@ -82,8 +82,10 @@ def date_param(): "dags list-import-errors", "dags list-version example_bash_operator", "dags list-warning", - # Order of trigger and pause/unpause is important for test stability because state checked + # Order of trigger, state, and pause/unpause is important for test stability "dags trigger example_bash_operator --logical-date={date_param} --run-after={date_param}", + 'dags state example_bash_operator "manual__{date_param}"', + 'dags state example_bash_operator "{date_param}"', # Test trigger without logical-date (should default to now) "dags trigger example_bash_operator", "dags next-execution example_bash_operator", diff --git a/airflow-ctl/docs/images/command_hashes.txt b/airflow-ctl/docs/images/command_hashes.txt index 53c93e7546d1e..764c9b7cb29a3 100644 --- a/airflow-ctl/docs/images/command_hashes.txt +++ b/airflow-ctl/docs/images/command_hashes.txt @@ -4,7 +4,7 @@ auth:d79e9c7d00c432bdbcbc2a86e2e32053 backfill:74c8737b0a62a86ed3605fa9e6165874 config:a3d936cb15fe3b547bf6c82cf93d923f connections:942f9f88cb908c28bf5c19159fc5065b -dags:6b38e6bcd491bc1941e7814b77e63bde +dags:5ad68174a1111563dff870a241914b30 dagrun:c32e0011aa9a845456c778786717208e jobs:a5b644c5da8889443bb40ee10b599270 pools:19efe105b9515ab1926ebcaf0e028d71 diff --git a/airflow-ctl/docs/images/output_dags.svg b/airflow-ctl/docs/images/output_dags.svg index 7f95b26bdbfe0..70f2cda4cd0ac 100644 --- a/airflow-ctl/docs/images/output_dags.svg +++ b/airflow-ctl/docs/images/output_dags.svg @@ -1,4 +1,4 @@ - + - - + + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + + + + - + - + - - Usage:airflowctl dags [-hCOMMAND... - -Perform Dags operations - -Positional Arguments: -COMMAND -deleteDelete a Dag by its ID -getRetrieve a Dag by its ID -get-detailsRetrieve detailed information for a Dag -get-import-errorRetrieve a Dag import error by its ID -get-statsRetrieve run statistics for one or more Dags -get-tagsList all tags used across Dags -get-versionRetrieve a specific version of a Dag -listList all Dags -list-import-errors -List all Dag import errors -list-versionList all versions of a Dag -list-warningList all Dag warnings -next-executionShow the next scheduled execution time for a Dag -pausePause a Dag -triggerTrigger a new Dag run -unpauseUnpause a Dag -updateUpdate properties of a Dag - -Options: --h--helpshow this help message and exit + + Usage:airflowctl dags [-hCOMMAND... + +Perform Dags operations + +Positional Arguments: +COMMAND +deleteDelete a Dag by its ID +getRetrieve a Dag by its ID +get-detailsRetrieve detailed information for a Dag +get-import-errorRetrieve a Dag import error by its ID +get-statsRetrieve run statistics for one or more Dags +get-tagsList all tags used across Dags +get-versionRetrieve a specific version of a Dag +listList all Dags +list-import-errors +List all Dag import errors +list-versionList all versions of a Dag +list-warningList all Dag warnings +next-executionShow the next scheduled execution time for a Dag +pausePause a Dag +stateGet the status of a Dag run +triggerTrigger a new Dag run +unpauseUnpause a Dag +updateUpdate properties of a Dag + +Options: +-h--helpshow this help message and exit diff --git a/airflow-ctl/src/airflowctl/api/client.py b/airflow-ctl/src/airflowctl/api/client.py index b01200fac1c7f..de2cec4ea5f33 100644 --- a/airflow-ctl/src/airflowctl/api/client.py +++ b/airflow-ctl/src/airflowctl/api/client.py @@ -111,7 +111,8 @@ def get_json_error(response: httpx.Response): if err: # This part is used in integration tests to verify the error message # If you are updating here don't forget to update the airflow-ctl-tests - log.warning("Server error ", extra=dict(err.response.json())) + if not response.request.extensions.get("airflowctl_suppress_error_log"): + log.warning("Server error ", extra=dict(err.response.json())) raise err diff --git a/airflow-ctl/src/airflowctl/api/operations.py b/airflow-ctl/src/airflowctl/api/operations.py index e250b66e127dd..ef65d2b3e36e1 100644 --- a/airflow-ctl/src/airflowctl/api/operations.py +++ b/airflow-ctl/src/airflowctl/api/operations.py @@ -601,10 +601,15 @@ def trigger( class DagRunOperations(BaseOperations): """Dag run operations.""" - def get(self, dag_id: str, dag_run_id: str) -> DAGRunResponse | ServerResponseError: + def get( + self, dag_id: str, dag_run_id: str, *, suppress_error_log: bool = False + ) -> DAGRunResponse | ServerResponseError: """Get a dag run.""" try: - self.response = self.client.get(f"/dags/{dag_id}/dagRuns/{dag_run_id}") + self.response = self.client.get( + f"/dags/{dag_id}/dagRuns/{dag_run_id}", + extensions={"airflowctl_suppress_error_log": suppress_error_log}, + ) return DAGRunResponse.model_validate_json(self.response.content) except ServerResponseError as e: raise e @@ -616,6 +621,9 @@ def list( start_date: datetime.datetime | None = None, end_date: datetime.datetime | None = None, dag_id: str | None = None, + logical_date_gte: datetime.datetime | None = None, + logical_date_lte: datetime.datetime | None = None, + order_by: str | None = None, ) -> DAGRunCollectionResponse | ServerResponseError: """ List dag runs (at most `limit` results). @@ -626,6 +634,9 @@ def list( end_date: Filter dag runs by end date (optional) limit: Limit the number of results returned dag_id: The DAG ID to filter by. If None, retrieves dag runs for all DAGs (using "~"). + logical_date_gte: Filter dag runs with a logical date greater than or equal to this value. + logical_date_lte: Filter dag runs with a logical date less than or equal to this value. + order_by: Order the results by the specified field. """ # Use "~" for all DAGs if dag_id is not specified if not dag_id: @@ -638,6 +649,12 @@ def list( params["start_date"] = start_date.isoformat() if end_date is not None: params["end_date"] = end_date.isoformat() + if logical_date_gte is not None: + params["logical_date_gte"] = logical_date_gte.isoformat() + if logical_date_lte is not None: + params["logical_date_lte"] = logical_date_lte.isoformat() + if order_by is not None: + params["order_by"] = order_by try: self.response = self.client.get(f"/dags/{dag_id}/dagRuns", params=params) diff --git a/airflow-ctl/src/airflowctl/ctl/cli_config.py b/airflow-ctl/src/airflowctl/ctl/cli_config.py index 11ff4542e01ef..d0901e795ef48 100755 --- a/airflow-ctl/src/airflowctl/ctl/cli_config.py +++ b/airflow-ctl/src/airflowctl/ctl/cli_config.py @@ -267,6 +267,11 @@ def _load_help_texts_yaml() -> dict[str, dict[str, str]]: type=str, help="The Dag ID of the Dag to pause or unpause", ) +ARG_LOGICAL_DATE_OR_RUN_ID = Arg( + flags=("logical_date_or_run_id",), + type=str, + help="The logical date with a timezone offset or run ID of the Dag run", +) ARG_ACTION_ON_EXISTING_KEY = Arg( flags=("-a", "--action-on-existing-key"), @@ -977,6 +982,15 @@ def merge_commands( ARG_OUTPUT, ), ), + ActionCommand( + name="state", + help="Get the status of a Dag run", + func=lazy_load_command("airflowctl.ctl.commands.dag_command.state"), + args=( + ARG_DAG_ID, + ARG_LOGICAL_DATE_OR_RUN_ID, + ), + ), ActionCommand( name="unpause", help="Unpause a Dag", diff --git a/airflow-ctl/src/airflowctl/ctl/commands/dag_command.py b/airflow-ctl/src/airflowctl/ctl/commands/dag_command.py index 301821f9c2c3c..f0562891d115c 100644 --- a/airflow-ctl/src/airflowctl/ctl/commands/dag_command.py +++ b/airflow-ctl/src/airflowctl/ctl/commands/dag_command.py @@ -17,13 +17,16 @@ from __future__ import annotations +import datetime +import json import sys from typing import Literal import rich +from rich.text import Text from airflowctl.api.client import NEW_API_CLIENT, ClientKind, ServerResponseError, provide_api_client -from airflowctl.api.datamodels.generated import DAGPatchBody +from airflowctl.api.datamodels.generated import DAGPatchBody, DAGRunResponse from airflowctl.ctl.console_formatting import AirflowConsole @@ -103,3 +106,55 @@ def next_execution(args, api_client=NEW_API_CLIENT) -> dict | None: output=args.output, ) return result + + +def _parse_logical_date(value: str) -> datetime.datetime | None: + """Parse an ISO-formatted logical date.""" + try: + logical_date = datetime.datetime.fromisoformat(value.replace("Z", "+00:00")) + except ValueError: + return None + if logical_date.tzinfo is None: + raise SystemExit("Logical date must include a timezone offset") + return logical_date + + +def _get_dag_run_by_run_id_or_logical_date(api_client, dag_id: str, value: str) -> DAGRunResponse | None: + """Get a Dag run by run ID, falling back to an exact logical date match.""" + try: + return api_client.dag_runs.get(dag_id=dag_id, dag_run_id=value, suppress_error_log=True) + except ServerResponseError as e: + if e.response.status_code != 404: + raise + + if logical_date := _parse_logical_date(value): + response = api_client.dag_runs.list( + dag_id=dag_id, + logical_date_gte=logical_date, + logical_date_lte=logical_date, + order_by="-id", + limit=1, + ) + if response.dag_runs: + return response.dag_runs[0] + else: + api_client.dag_runs.list(dag_id=dag_id, limit=1) + return None + + +@provide_api_client(kind=ClientKind.CLI) +def state(args, api_client=NEW_API_CLIENT) -> None: + """Show the state and configuration of a Dag run.""" + dag_run = _get_dag_run_by_run_id_or_logical_date( + api_client=api_client, + dag_id=args.dag_id, + value=args.logical_date_or_run_id, + ) + if not dag_run: + rich.print("[yellow]No matching Dag run found.[/yellow]") + else: + state_value = getattr(dag_run.state, "value", dag_run.state) + if dag_run.conf: + rich.print(Text(f"{state_value}, {json.dumps(dag_run.conf)}")) + else: + rich.print(Text(state_value)) diff --git a/airflow-ctl/src/airflowctl/ctl/help_texts.yaml b/airflow-ctl/src/airflowctl/ctl/help_texts.yaml index eb566a96b1fb8..8dfbb1eff51cb 100644 --- a/airflow-ctl/src/airflowctl/ctl/help_texts.yaml +++ b/airflow-ctl/src/airflowctl/ctl/help_texts.yaml @@ -64,6 +64,7 @@ dags: get-version: "Retrieve a specific version of a Dag" list-version: "List all versions of a Dag" list-warning: "List all Dag warnings" + state: "Get the status of a Dag run" trigger: "Trigger a new Dag run" dagrun: diff --git a/airflow-ctl/tests/airflow_ctl/api/test_client.py b/airflow-ctl/tests/airflow_ctl/api/test_client.py index 56cab7f7a8edc..3ce4466efef51 100644 --- a/airflow-ctl/tests/airflow_ctl/api/test_client.py +++ b/airflow-ctl/tests/airflow_ctl/api/test_client.py @@ -28,7 +28,14 @@ import time_machine from httpx import URL -from airflowctl.api.client import Client, ClientKind, Credentials, _bounded_get_new_password, get_client +from airflowctl.api.client import ( + Client, + ClientKind, + Credentials, + _bounded_get_new_password, + get_client, + get_json_error, +) from airflowctl.api.operations import ServerResponseError from airflowctl.exceptions import ( AirflowCtlCredentialNotFoundException, @@ -100,6 +107,27 @@ def handle_request(request: httpx.Request) -> httpx.Response: client.get("http://error") assert err.value.args == ("Client error message: {'detail': 'Not found'}",) + @pytest.mark.parametrize( + ("suppress_error_log", "expected_warning_count"), + [ + pytest.param(False, 1, id="logs-error"), + pytest.param(True, 0, id="suppresses-error-log"), + ], + ) + @patch("airflowctl.api.client.log.warning") + def test_error_log_suppression(self, mock_warning, suppress_error_log, expected_warning_count): + request = httpx.Request( + "GET", + "http://error", + extensions={"airflowctl_suppress_error_log": suppress_error_log}, + ) + response = httpx.Response(404, request=request, json={"detail": "Not found"}) + + with pytest.raises(ServerResponseError): + get_json_error(response) + + assert mock_warning.call_count == expected_warning_count + @pytest.mark.parametrize( ("base_url", "client_kind", "expected_base_url"), [ diff --git a/airflow-ctl/tests/airflow_ctl/api/test_operations.py b/airflow-ctl/tests/airflow_ctl/api/test_operations.py index 52faecee73ea0..1ada5122854c2 100644 --- a/airflow-ctl/tests/airflow_ctl/api/test_operations.py +++ b/airflow-ctl/tests/airflow_ctl/api/test_operations.py @@ -1187,6 +1187,20 @@ def handle_request(request: httpx.Request) -> httpx.Response: response = client.dag_runs.get(dag_id=self.dag_id, dag_run_id=self.dag_run_id) assert response == self.dag_run_response + @pytest.mark.parametrize("suppress_error_log", [False, True]) + def test_get_passes_error_log_suppression_extension(self, suppress_error_log): + def handle_request(request: httpx.Request) -> httpx.Response: + assert request.extensions["airflowctl_suppress_error_log"] is suppress_error_log + return httpx.Response(200, json=json.loads(self.dag_run_response.model_dump_json())) + + client = make_api_client(transport=httpx.MockTransport(handle_request)) + response = client.dag_runs.get( + dag_id=self.dag_id, + dag_run_id=self.dag_run_id, + suppress_error_log=suppress_error_log, + ) + assert response == self.dag_run_response + def test_list(self): def handle_request(request: httpx.Request) -> httpx.Response: assert request.url.path == f"/api/v2/dags/{self.dag_id}/dagRuns" @@ -1202,6 +1216,28 @@ def handle_request(request: httpx.Request) -> httpx.Response: ) assert response == self.dag_run_collection_response + def test_list_with_logical_date_filters_and_order(self): + logical_date = datetime.datetime(2025, 1, 1, tzinfo=datetime.timezone.utc) + + def handle_request(request: httpx.Request) -> httpx.Response: + assert dict(request.url.params) == { + "limit": "1", + "logical_date_gte": logical_date.isoformat(), + "logical_date_lte": logical_date.isoformat(), + "order_by": "-id", + } + return httpx.Response(200, json=json.loads(self.dag_run_collection_response.model_dump_json())) + + client = make_api_client(transport=httpx.MockTransport(handle_request)) + response = client.dag_runs.list( + dag_id=self.dag_id, + logical_date_gte=logical_date, + logical_date_lte=logical_date, + order_by="-id", + limit=1, + ) + assert response == self.dag_run_collection_response + @pytest.mark.parametrize( ( "dag_id_input", diff --git a/airflow-ctl/tests/airflow_ctl/ctl/commands/test_dag_command.py b/airflow-ctl/tests/airflow_ctl/ctl/commands/test_dag_command.py index 405eb030065f8..f5966f09c0f28 100644 --- a/airflow-ctl/tests/airflow_ctl/ctl/commands/test_dag_command.py +++ b/airflow-ctl/tests/airflow_ctl/ctl/commands/test_dag_command.py @@ -17,15 +17,24 @@ from __future__ import annotations import datetime +from unittest import mock +import httpx import pytest from airflowctl.api.client import ClientKind from airflowctl.api.datamodels.generated import DAGResponse +from airflowctl.api.operations import ServerResponseError from airflowctl.ctl import cli_parser from airflowctl.ctl.commands import dag_command +def _server_error(status_code: int) -> ServerResponseError: + request = httpx.Request("GET", "http://testserver/api/v2/dags/test_dag/dagRuns/test_run") + response = httpx.Response(status_code, request=request, json={"detail": "boom"}) + return ServerResponseError(message="boom", request=request, response=response) + + class TestDagCommands: parser = cli_parser.get_parser() dag_id = "test_dag" @@ -215,3 +224,111 @@ def test_next_execution_fail(self, api_client_maker): self.parser.parse_args(["dags", "next-execution", self.dag_id]), api_client=api_client, ) + + def test_state_by_run_id(self, capsys): + api_client = mock.MagicMock() + api_client.dag_runs.get.return_value = mock.MagicMock(state="success", conf={}) + + dag_command.state( + self.parser.parse_args(["dags", "state", self.dag_id, "test_run"]), + api_client=api_client, + ) + + assert capsys.readouterr().out.strip() == "success" + api_client.dag_runs.get.assert_called_once_with( + dag_id=self.dag_id, + dag_run_id="test_run", + suppress_error_log=True, + ) + api_client.dag_runs.list.assert_not_called() + + def test_state_by_logical_date(self, capsys): + api_client = mock.MagicMock() + logical_date = datetime.datetime(2025, 1, 1, tzinfo=datetime.timezone.utc) + api_client.dag_runs.get.side_effect = _server_error(404) + api_client.dag_runs.list.return_value.dag_runs = [ + mock.MagicMock(state="failed", conf={"reason": "[red]test[/red]"}) + ] + + dag_command.state( + self.parser.parse_args(["dags", "state", self.dag_id, logical_date.isoformat()]), + api_client=api_client, + ) + + assert capsys.readouterr().out.strip() == 'failed, {"reason": "[red]test[/red]"}' + api_client.dag_runs.list.assert_called_once_with( + dag_id=self.dag_id, + logical_date_gte=logical_date, + logical_date_lte=logical_date, + order_by="-id", + limit=1, + ) + + @pytest.mark.parametrize( + ("value", "expected_list_kwargs"), + [ + pytest.param("missing_run", {"dag_id": dag_id, "limit": 1}, id="run-id"), + pytest.param( + "2025-01-01T00:00:00+00:00", + { + "dag_id": dag_id, + "logical_date_gte": datetime.datetime(2025, 1, 1, tzinfo=datetime.timezone.utc), + "logical_date_lte": datetime.datetime(2025, 1, 1, tzinfo=datetime.timezone.utc), + "order_by": "-id", + "limit": 1, + }, + id="logical-date", + ), + ], + ) + @mock.patch("rich.print") + def test_state_missing_run_prints_message(self, mock_rich_print, value, expected_list_kwargs): + api_client = mock.MagicMock() + api_client.dag_runs.get.side_effect = _server_error(404) + api_client.dag_runs.list.return_value.dag_runs = [] + + dag_command.state( + self.parser.parse_args(["dags", "state", self.dag_id, value]), + api_client=api_client, + ) + + mock_rich_print.assert_called_once_with("[yellow]No matching Dag run found.[/yellow]") + api_client.dag_runs.list.assert_called_once_with(**expected_list_kwargs) + + def test_state_missing_dag_propagates_api_error(self): + api_client = mock.MagicMock() + api_client.dag_runs.get.side_effect = _server_error(404) + api_client.dag_runs.list.side_effect = error = _server_error(404) + + with pytest.raises(ServerResponseError) as ctx: + dag_command.state( + self.parser.parse_args(["dags", "state", self.dag_id, "missing_run"]), + api_client=api_client, + ) + + assert ctx.value is error + + def test_state_rejects_naive_logical_date(self): + api_client = mock.MagicMock() + api_client.dag_runs.get.side_effect = _server_error(404) + + with pytest.raises(SystemExit, match="Logical date must include a timezone offset"): + dag_command.state( + self.parser.parse_args(["dags", "state", self.dag_id, "2025-01-01T00:00:00"]), + api_client=api_client, + ) + + api_client.dag_runs.list.assert_not_called() + + def test_state_propagates_non_404_api_error(self): + api_client = mock.MagicMock() + api_client.dag_runs.get.side_effect = error = _server_error(500) + + with pytest.raises(ServerResponseError) as ctx: + dag_command.state( + self.parser.parse_args(["dags", "state", self.dag_id, "test_run"]), + api_client=api_client, + ) + + assert ctx.value is error + api_client.dag_runs.list.assert_not_called() diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index 2ba6bf200d5be..f154d2d381845 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -720,6 +720,7 @@ Grpc gRPC grpc gsuite +gte Gunicorn gunicorn gz @@ -973,6 +974,7 @@ logstash longblob lookups lshift +lte lxml machineTypes macOS