diff --git a/airflow-core/src/airflow/cli/commands/dag_processor_command.py b/airflow-core/src/airflow/cli/commands/dag_processor_command.py index c866763338ff1..9f054eb846755 100644 --- a/airflow-core/src/airflow/cli/commands/dag_processor_command.py +++ b/airflow-core/src/airflow/cli/commands/dag_processor_command.py @@ -22,7 +22,7 @@ from typing import Any from airflow.cli.commands.daemon_utils import run_command_with_daemon_option -from airflow.dag_processing.manager import DagFileProcessorManager, reload_configuration_for_dag_processing +from airflow.dag_processing.manager import DagFileProcessorManager from airflow.jobs.dag_processor_job_runner import DagProcessorJobRunner from airflow.jobs.job import Job, run_job from airflow.utils import cli as cli_utils @@ -50,7 +50,6 @@ def dag_processor(args): """Start Airflow Dag Processor Job.""" job_runner = _create_dag_processor_job_runner(args) - reload_configuration_for_dag_processing() run_command_with_daemon_option( args=args, process_name="dag-processor", diff --git a/airflow-core/src/airflow/dag_processing/manager.py b/airflow-core/src/airflow/dag_processing/manager.py index 55cb41610b006..cf21c28f85ecd 100644 --- a/airflow-core/src/airflow/dag_processing/manager.py +++ b/airflow-core/src/airflow/dag_processing/manager.py @@ -21,7 +21,6 @@ import contextlib import functools -import importlib import inspect import logging import os @@ -35,7 +34,6 @@ from collections.abc import Callable, Iterable, Iterator from dataclasses import dataclass, field from datetime import datetime, timedelta -from importlib import import_module from operator import attrgetter, itemgetter from pathlib import Path from typing import TYPE_CHECKING, Any, NamedTuple, cast @@ -47,7 +45,6 @@ from tabulate import tabulate from uuid6 import uuid7 -import airflow.models from airflow._shared.timezones import timezone from airflow.api_fastapi.execution_api.app import InProcessExecutionAPI from airflow.configuration import conf @@ -1102,29 +1099,6 @@ def emit_metrics(self): ) -def reload_configuration_for_dag_processing(): - # Reload configurations and settings to avoid collision with parent process. - # Because this process may need custom configurations that cannot be shared, - # e.g. RotatingFileHandler. And it can cause connection corruption if we - # do not recreate the SQLA connection pool. - os.environ["CONFIG_PROCESSOR_MANAGER_LOGGER"] = "True" - os.environ["AIRFLOW__LOGGING__COLORED_CONSOLE_LOG"] = "False" - # Replicating the behavior of how logging module was loaded - # in logging_config.py - # TODO: This reloading should be removed when we fix our logging behaviour - # In case of "spawn" method of starting processes for multiprocessing, reinitializing of the - # SQLAlchemy engine causes extremely unexpected behaviour of messing with objects already loaded - # in a parent process (likely via resources shared in memory by the ORM libraries). - # This caused flaky tests in our CI for many months and has been discovered while - # iterating on https://github.com/apache/airflow/pull/19860 - # The issue that describes the problem and possible remediation is - # at https://github.com/apache/airflow/issues/19934 - importlib.reload(import_module(airflow.settings.LOGGING_CLASS_PATH.rsplit(".", 1)[0])) - importlib.reload(airflow.settings) - airflow.settings.initialize() - del os.environ["CONFIG_PROCESSOR_MANAGER_LOGGER"] - - def process_parse_results( run_duration: float, finish_time: datetime,