Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/boring-cyborg.yml
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ labelPRBasedOnFilePath:

area:Logging:
- airflow-core/src/airflow/config_templates/airflow_local_settings.py
- airflow-core/tests/unit/core/test_logging_config.py
- shared/logging/**/*
- airflow-core/src/airflow/utils/log/**/*
- airflow-core/docs/administration-and-deployment/logging-monitoring/logging-*.rst
- airflow-core/tests/unit/utils/log/**/*
Expand Down
11 changes: 9 additions & 2 deletions airflow-core/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,11 @@ dependencies = [
"apache-airflow-providers-common-sql>=1.26.0",
"apache-airflow-providers-smtp>=2.0.2",
"apache-airflow-providers-standard>=0.4.0",
# Start of shared logging dependencies
"msgspec>=0.19.0",
"pygtrie>=2.5.0",
"structlog>=25.4.0",
# End of shared logging dependencies
]


Expand Down Expand Up @@ -211,8 +216,9 @@ exclude = [
]

[tool.hatch.build.targets.sdist.force-include]
"../shared/timezones/src/airflow_shared/timezones" = "src/airflow/_shared/timezones"
"../shared/logging/src/airflow_shared/logging" = "src/airflow/_shared/logging"
"../shared/secrets_masker/src/airflow_shared/secrets_masker" = "src/airflow/_shared/secrets_masker"
"../shared/timezones/src/airflow_shared/timezones" = "src/airflow/_shared/timezones"

[tool.hatch.build.targets.custom]
path = "./hatch_build.py"
Expand Down Expand Up @@ -280,6 +286,7 @@ apache-airflow-devel-common = { workspace = true }

[tool.airflow]
shared_distributions = [
"apache-airflow-shared-timezones",
"apache-airflow-shared-logging",
"apache-airflow-shared-secrets-masker",
"apache-airflow-shared-timezones",
]
1 change: 1 addition & 0 deletions airflow-core/src/airflow/_shared/logging
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@
},
"handlers": {
"console": {
"class": "airflow.utils.log.logging_mixin.RedirectStdHandler",
"class": "logging.StreamHandler",
# "class": "airflow.utils.log.logging_mixin.RedirectStdHandler",
"formatter": "airflow_coloured",
"stream": "sys.stdout",
"filters": ["mask_secrets_core"],
Expand Down
4 changes: 2 additions & 2 deletions airflow-core/src/airflow/dag_processing/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@

from __future__ import annotations

import logging
import traceback
from typing import TYPE_CHECKING, NamedTuple, TypeVar

import structlog
from sqlalchemy import delete, func, insert, select, tuple_, update
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import joinedload, load_only
Expand Down Expand Up @@ -73,7 +73,7 @@

AssetT = TypeVar("AssetT", bound=BaseAsset)

log = logging.getLogger(__name__)
log = structlog.get_logger(__name__)


def _create_orm_dags(
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -876,7 +876,7 @@ def _get_logger_for_dag_file(self, dag_file: DagFileInfo):
log_file = init_log_file(log_filename)
logger_filehandle = log_file.open("ab")
underlying_logger = structlog.BytesLogger(logger_filehandle)
processors = logging_processors(enable_pretty_log=False)[0]
processors = logging_processors(json_output=True)
return structlog.wrap_logger(
underlying_logger, processors=processors, logger_name="processor"
).bind(), logger_filehandle
Expand Down
4 changes: 2 additions & 2 deletions airflow-core/src/airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,12 @@
from airflow.utils.types import DagRunTriggeredByType, DagRunType

if TYPE_CHECKING:
import logging
from types import FrameType

from pendulum.datetime import DateTime
from sqlalchemy.orm import Query, Session

from airflow._shared.logging.types import Logger
from airflow.executors.base_executor import BaseExecutor
from airflow.executors.executor_utils import ExecutorName
from airflow.models.mappedoperator import MappedOperator
Expand Down Expand Up @@ -189,7 +189,7 @@ def __init__(
job: Job,
num_runs: int = conf.getint("scheduler", "num_runs"),
scheduler_idle_sleep_time: float = conf.getfloat("scheduler", "scheduler_idle_sleep_time"),
log: logging.Logger | None = None,
log: Logger | None = None,
):
super().__init__(job)
self.num_runs = num_runs
Expand Down
18 changes: 5 additions & 13 deletions airflow-core/src/airflow/jobs/triggerer_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
from collections import deque
from collections.abc import Generator, Iterable
from contextlib import suppress
from datetime import datetime
from socket import socket
from traceback import format_exception
from typing import TYPE_CHECKING, Annotated, Any, ClassVar, Literal, TypedDict
Expand Down Expand Up @@ -697,7 +696,7 @@ def _process_log_messages_from_subprocess(self) -> Generator[None, bytes | bytea

from airflow.sdk.log import logging_processors

processors, _ = logging_processors(enable_pretty_log=False)
processors = logging_processors(json_output=True)

def get_logger(trigger_id: int) -> WrappedLogger:
# TODO: Is a separate dict worth it, or should we make `self.running_triggers` a dict?
Expand All @@ -723,16 +722,6 @@ def get_logger(trigger_id: int) -> WrappedLogger:
# Log message about the TriggerRunner itself -- just output it
log = fallback_log

if ts := event.get("timestamp"):
# We use msgspec to decode the timestamp as it does it orders of magnitude quicker than
# datetime.strptime
#
# We remove the timezone info here, as the json encoding has `+00:00`, and since the log came
# from a subprocess we know that the timezone of the log message is the same, so having some
# messages include tz (from subprocess) but others not (ones from supervisor process) is
# confusing.
event["timestamp"] = msgspec.json.decode(f'"{ts}"', type=datetime).replace(tzinfo=None)

if exc := event.pop("exception", None):
# TODO: convert the dict back to a pretty stack trace
event["error_detail"] = exc
Expand Down Expand Up @@ -774,7 +763,10 @@ def send(self, msg: ToTriggerSupervisor) -> ToTriggerRunner | None:
return async_to_sync(self.asend)(msg)

async def _aread_frame(self):
len_bytes = await self._async_reader.readexactly(4)
try:
len_bytes = await self._async_reader.readexactly(4)
except ConnectionResetError:
asyncio.current_task().cancel("Supervisor closed")
length = int.from_bytes(len_bytes, byteorder="big")
if length >= 2**32:
raise OverflowError(f"Refusing to receive messages larger than 4GiB {length=}")
Expand Down
21 changes: 6 additions & 15 deletions airflow-core/src/airflow/logging_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import logging
import warnings
from importlib import import_module
from logging.config import dictConfig
from typing import TYPE_CHECKING, Any

from airflow.configuration import conf
Expand Down Expand Up @@ -86,33 +85,25 @@ def load_logging_config() -> tuple[dict[str, Any], str]:


def configure_logging():
from airflow._shared.logging.structlog import configure_logging

logging_config, logging_class_path = load_logging_config()
try:
# Ensure that the password masking filter is applied to the 'task' handler
# no matter what the user did.
if "filters" in logging_config and "mask_secrets" in logging_config["filters"]:
# But if they replace the logging config _entirely_, don't try to set this, it won't work
task_handler_config = logging_config["handlers"]["task"]

task_handler_config.setdefault("filters", [])

if "mask_secrets" not in task_handler_config["filters"]:
task_handler_config["filters"].append("mask_secrets")

level: str = conf.get_mandatory_value("logging", "LOGGING_LEVEL").upper()
# Try to init logging
dictConfig(logging_config)
configure_logging(log_level=level, stdlib_config=logging_config)
except (ValueError, KeyError) as e:
log.error("Unable to load the config, contains a configuration error.")
# When there is an error in the config, escalate the exception
# otherwise Airflow would silently fall back on the default config
raise e

validate_logging_config(logging_config)
validate_logging_config()

return logging_class_path


def validate_logging_config(logging_config):
def validate_logging_config():
"""Validate the provided Logging Config."""
# Now lets validate the other logging-related settings
task_log_reader = conf.get("logging", "task_log_reader")
Expand Down
11 changes: 11 additions & 0 deletions airflow-core/src/airflow/utils/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -433,15 +433,26 @@ def _wrapper(*args, **kwargs):
if args[0].verbose:
f(*args, **kwargs)
else:
from airflow._shared.logging.structlog import respect_stdlib_disable

with warnings.catch_warnings():
warnings.simplefilter("ignore")
logging.disable(logging.CRITICAL)

def drop(*_, **__):
from structlog import DropEvent

raise DropEvent()

old_fn = respect_stdlib_disable.__code__
respect_stdlib_disable.__code__ = drop.__code__
try:
f(*args, **kwargs)
finally:
# logging output again depends on the effective
# levels of individual loggers
logging.disable(logging.NOTSET)
respect_stdlib_disable.__code__ = old_fn

return cast("T", _wrapper)

Expand Down
4 changes: 2 additions & 2 deletions airflow-core/src/airflow/utils/log/file_processor_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from airflow import settings
from airflow._shared.timezones import timezone
from airflow.utils.helpers import parse_template_string
from airflow.utils.log.logging_mixin import DISABLE_PROPOGATE
from airflow.utils.log.logging_mixin import SetContextPropagate
from airflow.utils.log.non_caching_file_handler import NonCachingFileHandler

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -69,7 +69,7 @@ def set_context(self, filename):
self._symlink_latest_log_directory()
self._cur_date = datetime.today()

return DISABLE_PROPOGATE
return SetContextPropagate.DISABLE_PROPAGATE

def emit(self, record):
if self.handler is not None:
Expand Down
7 changes: 6 additions & 1 deletion airflow-core/src/airflow/utils/log/log_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from __future__ import annotations

import logging
import os
import time
from collections.abc import Generator, Iterator
from datetime import datetime, timezone
Expand All @@ -25,7 +26,7 @@

from airflow.configuration import conf
from airflow.utils.helpers import render_log_filename
from airflow.utils.log.file_task_handler import StructuredLogMessage
from airflow.utils.log.file_task_handler import FileTaskHandler, StructuredLogMessage
from airflow.utils.log.logging_mixin import ExternalLoggingMixin
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.state import TaskInstanceState
Expand Down Expand Up @@ -168,6 +169,10 @@ def handlers():
yield from logging.getLogger("airflow.task").handlers
yield from logging.getLogger().handlers

fallback = FileTaskHandler(os.devnull)
fallback.name = task_log_reader
yield fallback

return next((h for h in handlers() if h.name == task_log_reader), None)

@property
Expand Down
16 changes: 10 additions & 6 deletions airflow-core/src/airflow/utils/log/logging_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@
from logging import Handler, StreamHandler
from typing import IO, TYPE_CHECKING, Any, TypeVar, cast

import structlog

if TYPE_CHECKING:
from logging import Logger
from airflow._shared.logging.types import Logger

# 7-bit C1 ANSI escape sequences
ANSI_ESCAPE = re.compile(r"\x1B[@-_][0-?]*[ -/]*[@-~]")
Expand Down Expand Up @@ -67,7 +69,7 @@ def remove_escape_codes(text: str) -> str:
class LoggingMixin:
"""Convenience super-class to have a logger configured with the class name."""

_log: logging.Logger | None = None
_log: Logger | None = None

# Parent logger used by this class. It should match one of the loggers defined in the
# `logging_config_class`. By default, this attribute is used to create the final name of the logger, and
Expand Down Expand Up @@ -111,7 +113,7 @@ def _get_log(cls, obj: Any, clazz: type[_T]) -> Logger:
log_config_logger_name=obj._log_config_logger_name,
class_logger_name=obj._logger_name,
)
obj._log = logging.getLogger(logger_name)
obj._log = structlog.get_logger(logger_name)
return obj._log

@classmethod
Expand All @@ -124,9 +126,7 @@ def log(self) -> Logger:
"""Return a logger."""
return LoggingMixin._get_log(self, self.__class__)

def _set_context(self, context):
if context is not None:
set_context(self.log, context)
def _set_context(self, context): ...


class ExternalLoggingMixin(metaclass=abc.ABCMeta):
Expand Down Expand Up @@ -286,6 +286,10 @@ def set_context(logger, value):
:param logger: logger
:param value: value to set
"""
if not isinstance(logger, logging.Logger):
# This fn doesn't make sense for structlog based handlers
return

while logger:
orig_propagate = logger.propagate
for handler in logger.handlers:
Expand Down
11 changes: 7 additions & 4 deletions airflow-core/src/airflow/utils/retries.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,21 @@
import logging
from collections.abc import Callable
from inspect import signature
from typing import TypeVar, overload
from typing import TYPE_CHECKING, TypeVar, overload

from sqlalchemy.exc import DBAPIError

from airflow.configuration import conf

if TYPE_CHECKING:
from airflow._shared.logging.types import Logger

F = TypeVar("F", bound=Callable)

MAX_DB_RETRIES = conf.getint("database", "max_db_retries", fallback=3)


def run_with_db_retries(max_retries: int = MAX_DB_RETRIES, logger: logging.Logger | None = None, **kwargs):
def run_with_db_retries(max_retries: int = MAX_DB_RETRIES, logger: Logger | None = None, **kwargs):
"""Return Tenacity Retrying object with project specific default."""
import tenacity

Expand All @@ -43,8 +46,8 @@ def run_with_db_retries(max_retries: int = MAX_DB_RETRIES, logger: logging.Logge
reraise=True,
**kwargs,
)
if logger and isinstance(logger, logging.Logger):
retry_kwargs["before_sleep"] = tenacity.before_sleep_log(logger, logging.DEBUG, True)
if logger is not None:
retry_kwargs["before_sleep"] = tenacity.before_sleep_log(logger, logging.DEBUG, True) # type: ignore[arg-type]

return tenacity.Retrying(**retry_kwargs)

Expand Down
2 changes: 1 addition & 1 deletion airflow-core/tests/integration/otel/dags/otel_test_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def task1(ti):
tracer_provider = otel_task_tracer.get_otel_tracer_provider()

if context_carrier is not None:
logger.info("Found ti.context_carrier: %s.", context_carrier)
logger.info("Found ti.context_carrier: %s.", str(context_carrier))
logger.info("Extracting the span context from the context_carrier.")
parent_context = otel_task_tracer.extract(context_carrier)
with otel_task_tracer.start_child_span(
Expand Down
6 changes: 3 additions & 3 deletions airflow-core/tests/integration/otel/test_otel.py
Original file line number Diff line number Diff line change
Expand Up @@ -547,14 +547,14 @@ def print_ti_output_for_dag_run(dag_id: str, run_id: str):
for filename in files:
if filename.endswith(".log"):
full_path = os.path.join(root, filename)
log.info("\n===== LOG FILE: %s - START =====\n", full_path)
print("\n===== LOG FILE: %s - START =====\n", full_path)
try:
with open(full_path) as f:
log.info(f.read())
print(f.read())
except Exception as e:
log.error("Could not read %s: %s", full_path, e)

log.info("\n===== END =====\n")
print("\n===== END =====\n")


@pytest.mark.integration("redis")
Expand Down
Loading