Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,10 @@ def date_param():
"dags list-import-errors",
"dags list-version example_bash_operator",
"dags list-warning",
# Order of trigger and pause/unpause is important for test stability because state checked
# Order of trigger, state, and pause/unpause is important for test stability
"dags trigger example_bash_operator --logical-date={date_param} --run-after={date_param}",
'dags state example_bash_operator "manual__{date_param}"',
'dags state example_bash_operator "{date_param}"',
# Test trigger without logical-date (should default to now)
"dags trigger example_bash_operator",
"dags next-execution example_bash_operator",
Expand Down
2 changes: 1 addition & 1 deletion airflow-ctl/docs/images/command_hashes.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ auth:d79e9c7d00c432bdbcbc2a86e2e32053
backfill:74c8737b0a62a86ed3605fa9e6165874
config:a3d936cb15fe3b547bf6c82cf93d923f
connections:942f9f88cb908c28bf5c19159fc5065b
dags:6b38e6bcd491bc1941e7814b77e63bde
dags:5ad68174a1111563dff870a241914b30
dagrun:c32e0011aa9a845456c778786717208e
jobs:a5b644c5da8889443bb40ee10b599270
pools:19efe105b9515ab1926ebcaf0e028d71
Expand Down
130 changes: 67 additions & 63 deletions airflow-ctl/docs/images/output_dags.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
3 changes: 2 additions & 1 deletion airflow-ctl/src/airflowctl/api/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ def get_json_error(response: httpx.Response):
if err:
# This part is used in integration tests to verify the error message
# If you are updating here don't forget to update the airflow-ctl-tests
log.warning("Server error ", extra=dict(err.response.json()))
if not response.request.extensions.get("airflowctl_suppress_error_log"):
log.warning("Server error ", extra=dict(err.response.json()))
raise err


Expand Down
21 changes: 19 additions & 2 deletions airflow-ctl/src/airflowctl/api/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -601,10 +601,15 @@ def trigger(
class DagRunOperations(BaseOperations):
"""Dag run operations."""

def get(self, dag_id: str, dag_run_id: str) -> DAGRunResponse | ServerResponseError:
def get(
self, dag_id: str, dag_run_id: str, *, suppress_error_log: bool = False
) -> DAGRunResponse | ServerResponseError:
"""Get a dag run."""
try:
self.response = self.client.get(f"/dags/{dag_id}/dagRuns/{dag_run_id}")
self.response = self.client.get(
f"/dags/{dag_id}/dagRuns/{dag_run_id}",
extensions={"airflowctl_suppress_error_log": suppress_error_log},
)
return DAGRunResponse.model_validate_json(self.response.content)
except ServerResponseError as e:
raise e
Expand All @@ -616,6 +621,9 @@ def list(
start_date: datetime.datetime | None = None,
end_date: datetime.datetime | None = None,
dag_id: str | None = None,
logical_date_gte: datetime.datetime | None = None,
logical_date_lte: datetime.datetime | None = None,
order_by: str | None = None,
) -> DAGRunCollectionResponse | ServerResponseError:
"""
List dag runs (at most `limit` results).
Expand All @@ -626,6 +634,9 @@ def list(
end_date: Filter dag runs by end date (optional)
limit: Limit the number of results returned
dag_id: The DAG ID to filter by. If None, retrieves dag runs for all DAGs (using "~").
logical_date_gte: Filter dag runs with a logical date greater than or equal to this value.
logical_date_lte: Filter dag runs with a logical date less than or equal to this value.
order_by: Order the results by the specified field.
"""
# Use "~" for all DAGs if dag_id is not specified
if not dag_id:
Expand All @@ -638,6 +649,12 @@ def list(
params["start_date"] = start_date.isoformat()
if end_date is not None:
params["end_date"] = end_date.isoformat()
if logical_date_gte is not None:
params["logical_date_gte"] = logical_date_gte.isoformat()
if logical_date_lte is not None:
params["logical_date_lte"] = logical_date_lte.isoformat()
if order_by is not None:
params["order_by"] = order_by

try:
self.response = self.client.get(f"/dags/{dag_id}/dagRuns", params=params)
Expand Down
14 changes: 14 additions & 0 deletions airflow-ctl/src/airflowctl/ctl/cli_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,11 @@ def _load_help_texts_yaml() -> dict[str, dict[str, str]]:
type=str,
help="The Dag ID of the Dag to pause or unpause",
)
ARG_LOGICAL_DATE_OR_RUN_ID = Arg(
flags=("logical_date_or_run_id",),
type=str,
help="The logical date with a timezone offset or run ID of the Dag run",
)

ARG_ACTION_ON_EXISTING_KEY = Arg(
flags=("-a", "--action-on-existing-key"),
Expand Down Expand Up @@ -977,6 +982,15 @@ def merge_commands(
ARG_OUTPUT,
),
),
ActionCommand(
name="state",
help="Get the status of a Dag run",
func=lazy_load_command("airflowctl.ctl.commands.dag_command.state"),
args=(
ARG_DAG_ID,
ARG_LOGICAL_DATE_OR_RUN_ID,
),
),
ActionCommand(
name="unpause",
help="Unpause a Dag",
Expand Down
57 changes: 56 additions & 1 deletion airflow-ctl/src/airflowctl/ctl/commands/dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@

from __future__ import annotations

import datetime
import json
import sys
from typing import Literal

import rich
from rich.text import Text

from airflowctl.api.client import NEW_API_CLIENT, ClientKind, ServerResponseError, provide_api_client
from airflowctl.api.datamodels.generated import DAGPatchBody
from airflowctl.api.datamodels.generated import DAGPatchBody, DAGRunResponse
from airflowctl.ctl.console_formatting import AirflowConsole


Expand Down Expand Up @@ -103,3 +106,55 @@ def next_execution(args, api_client=NEW_API_CLIENT) -> dict | None:
output=args.output,
)
return result


def _parse_logical_date(value: str) -> datetime.datetime | None:
"""Parse an ISO-formatted logical date."""
try:
logical_date = datetime.datetime.fromisoformat(value.replace("Z", "+00:00"))
except ValueError:
return None
if logical_date.tzinfo is None:
raise SystemExit("Logical date must include a timezone offset")
return logical_date


def _get_dag_run_by_run_id_or_logical_date(api_client, dag_id: str, value: str) -> DAGRunResponse | None:
"""Get a Dag run by run ID, falling back to an exact logical date match."""
try:
return api_client.dag_runs.get(dag_id=dag_id, dag_run_id=value, suppress_error_log=True)
except ServerResponseError as e:
if e.response.status_code != 404:
raise

if logical_date := _parse_logical_date(value):
response = api_client.dag_runs.list(
dag_id=dag_id,
logical_date_gte=logical_date,
logical_date_lte=logical_date,
order_by="-id",
limit=1,
)
if response.dag_runs:
return response.dag_runs[0]
else:
api_client.dag_runs.list(dag_id=dag_id, limit=1)
return None


@provide_api_client(kind=ClientKind.CLI)
def state(args, api_client=NEW_API_CLIENT) -> None:
"""Show the state and configuration of a Dag run."""
dag_run = _get_dag_run_by_run_id_or_logical_date(
api_client=api_client,
dag_id=args.dag_id,
value=args.logical_date_or_run_id,
)
if not dag_run:
rich.print("[yellow]No matching Dag run found.[/yellow]")
else:
state_value = getattr(dag_run.state, "value", dag_run.state)
if dag_run.conf:
rich.print(Text(f"{state_value}, {json.dumps(dag_run.conf)}"))
else:
rich.print(Text(state_value))
1 change: 1 addition & 0 deletions airflow-ctl/src/airflowctl/ctl/help_texts.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ dags:
get-version: "Retrieve a specific version of a Dag"
list-version: "List all versions of a Dag"
list-warning: "List all Dag warnings"
state: "Get the status of a Dag run"
trigger: "Trigger a new Dag run"

dagrun:
Expand Down
30 changes: 29 additions & 1 deletion airflow-ctl/tests/airflow_ctl/api/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,14 @@
import time_machine
from httpx import URL

from airflowctl.api.client import Client, ClientKind, Credentials, _bounded_get_new_password, get_client
from airflowctl.api.client import (
Client,
ClientKind,
Credentials,
_bounded_get_new_password,
get_client,
get_json_error,
)
from airflowctl.api.operations import ServerResponseError
from airflowctl.exceptions import (
AirflowCtlCredentialNotFoundException,
Expand Down Expand Up @@ -100,6 +107,27 @@ def handle_request(request: httpx.Request) -> httpx.Response:
client.get("http://error")
assert err.value.args == ("Client error message: {'detail': 'Not found'}",)

@pytest.mark.parametrize(
("suppress_error_log", "expected_warning_count"),
[
pytest.param(False, 1, id="logs-error"),
pytest.param(True, 0, id="suppresses-error-log"),
],
)
@patch("airflowctl.api.client.log.warning")
def test_error_log_suppression(self, mock_warning, suppress_error_log, expected_warning_count):
request = httpx.Request(
"GET",
"http://error",
extensions={"airflowctl_suppress_error_log": suppress_error_log},
)
response = httpx.Response(404, request=request, json={"detail": "Not found"})

with pytest.raises(ServerResponseError):
get_json_error(response)

assert mock_warning.call_count == expected_warning_count

@pytest.mark.parametrize(
("base_url", "client_kind", "expected_base_url"),
[
Expand Down
36 changes: 36 additions & 0 deletions airflow-ctl/tests/airflow_ctl/api/test_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -1187,6 +1187,20 @@ def handle_request(request: httpx.Request) -> httpx.Response:
response = client.dag_runs.get(dag_id=self.dag_id, dag_run_id=self.dag_run_id)
assert response == self.dag_run_response

@pytest.mark.parametrize("suppress_error_log", [False, True])
def test_get_passes_error_log_suppression_extension(self, suppress_error_log):
def handle_request(request: httpx.Request) -> httpx.Response:
assert request.extensions["airflowctl_suppress_error_log"] is suppress_error_log
return httpx.Response(200, json=json.loads(self.dag_run_response.model_dump_json()))

client = make_api_client(transport=httpx.MockTransport(handle_request))
response = client.dag_runs.get(
dag_id=self.dag_id,
dag_run_id=self.dag_run_id,
suppress_error_log=suppress_error_log,
)
assert response == self.dag_run_response

def test_list(self):
def handle_request(request: httpx.Request) -> httpx.Response:
assert request.url.path == f"/api/v2/dags/{self.dag_id}/dagRuns"
Expand All @@ -1202,6 +1216,28 @@ def handle_request(request: httpx.Request) -> httpx.Response:
)
assert response == self.dag_run_collection_response

def test_list_with_logical_date_filters_and_order(self):
logical_date = datetime.datetime(2025, 1, 1, tzinfo=datetime.timezone.utc)

def handle_request(request: httpx.Request) -> httpx.Response:
assert dict(request.url.params) == {
"limit": "1",
"logical_date_gte": logical_date.isoformat(),
"logical_date_lte": logical_date.isoformat(),
"order_by": "-id",
}
return httpx.Response(200, json=json.loads(self.dag_run_collection_response.model_dump_json()))

client = make_api_client(transport=httpx.MockTransport(handle_request))
response = client.dag_runs.list(
dag_id=self.dag_id,
logical_date_gte=logical_date,
logical_date_lte=logical_date,
order_by="-id",
limit=1,
)
assert response == self.dag_run_collection_response

@pytest.mark.parametrize(
(
"dag_id_input",
Expand Down
Loading