Skip to content

Move secrets_masker over to airflow_shared distribution#54449

Merged
amoghrajesh merged 59 commits intoapache:mainfrom
astronomer:move-secrets-masker-to-shared
Aug 23, 2025
Merged

Move secrets_masker over to airflow_shared distribution#54449
amoghrajesh merged 59 commits intoapache:mainfrom
astronomer:move-secrets-masker-to-shared

Conversation

@amoghrajesh
Copy link
Copy Markdown
Contributor

@amoghrajesh amoghrajesh commented Aug 13, 2025

closes: #54397

Why

As part of #51545, we are trying to avoid code duplication between airflow-core and task-sdk. The secrets masking functionality is widely used so that it can be moved to a shared library. This follows the pattern set by the timezone utilities migration(#53149) and ensures a single source of truth for secrets masking logic across Airflow codebase.

High level changes made

1. Created shared library structure

  • New directory: shared/secrets_masker/ with standard shared library layout
  • Project configuration: shared/secrets_masker/pyproject.toml defining apache-airflow-shared-secrets_masker
  • Source code: Moved from task-sdk/src/airflow/sdk/execution_time/secrets_masker.py to shared/secrets_masker/src/airflow_shared/secrets_masker/secrets_masker.py

2. Updated consuming packages

  • Workspace configuration: Added shared/secrets_masker to root pyproject.toml workspace members
  • Public API modules: Created task-sdk/src/airflow/sdk/secrets_masker.py that re-exports shared functionality
  • Import statements: Updated imports across airflow-core and task-sdk to use new shared paths

4. How the updates were made:

  • Pre-commit hooks: Leveraged existing check-shared-distributions-structure and check-shared-distributions-usage hooks
  • Symlinks: Automatic creation of symlinks from airflow/_shared/secrets_masker and airflow/sdk/_shared/secrets_masker
  • Force-include: Automated addition to pyproject.toml files for wheel packaging

Sanity tests

Symlink verification

# Verify symlinks exist and point to correct locations
(airflow) ➜  airflow git:(move-secrets-masker-to-shared) ✗ ls -la airflow-core/src/airflow/_shared/secrets_masker
ls -la task-sdk/src/airflow/sdk/_shared/secrets_masker
lrwxr-xr-x@ 1 amoghdesai  staff  67 Aug 13 15:59 airflow-core/src/airflow/_shared/secrets_masker -> ../../../../shared/secrets_masker/src/airflow_shared/secrets_masker
lrwxr-xr-x@ 1 amoghdesai  staff  70 Aug 13 15:59 task-sdk/src/airflow/sdk/_shared/secrets_masker -> ../../../../../shared/secrets_masker/src/airflow_shared/secrets_masker


# Check symlink targets
(airflow) ➜  airflow git:(move-secrets-masker-to-shared) ✗ readlink airflow-core/src/airflow/_shared/secrets_masker
readlink task-sdk/src/airflow/sdk/_shared/secrets_masker
../../../../shared/secrets_masker/src/airflow_shared/secrets_masker
../../../../../shared/secrets_masker/src/airflow_shared/secrets_masker

Shared library test in isolation

# Test shared library works independently
(airflow) ➜  airflow git:(move-secrets-masker-to-shared) ✗ cd shared/secrets_masker
PYTHONPATH=src python -c "
from airflow_shared.secrets_masker.secrets_masker import SecretsMasker
filt = SecretsMasker()
filt.add_mask('password')
print('✅ Shared library works in isolation')
"
✅ Shared library works in isolation

Import validation

# Test imports work from consuming packages
(airflow) ➜  secrets_masker git:(move-secrets-masker-to-shared) ✗python -c "from airflow.sdk.secrets_masker import SecretsMasker; print('✅ task-sdk import works')""
python -c "from airflow._shared.secrets_masker.secrets_masker import SecretsMasker; print('✅ airflow-core import works')"
✅ task-sdk import works
✅ airflow-core import works

Some tests for peace of mind

DAG used for testing some cases:

from __future__ import annotations

import logging

from airflow import DAG
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.providers.standard.operators.python import PythonOperator
from airflow.sdk import Variable, Connection


def my_function() -> None:
    conn = Connection.get("my_connection")
    logging.getLogger(__name__).info(conn.password)
    print("via print", conn.password)


with DAG("subprocess_dag") as dag:
    start = EmptyOperator(task_id="start")

    py_func = PythonOperator(task_id="py_func", python_callable=my_function)

    end = EmptyOperator(task_id="end")

    start >> py_func >> end

Env set:

export AIRFLOW_CONN_MY_CONNECTION="mysql://testuser:testpassword123@localhost:3306/testdb
  1. Running a task through CLI: airflow tasks test subprocess_dag py_fu
[2025-08-21T07:52:11.216+0000] {dag.py:2329} INFO - created dagrun <DagRun subprocess_dag @ None: __airflow_temporary_run_2025-08-21T07:52:11.207333+00:00__, state:running, queued_at: None. run_type: manual>
[2025-08-21T07:52:11.222+0000] {dag.py:1293} INFO - [DAG TEST] starting task_id=py_func map_index=-1
[2025-08-21T07:52:11.680+0000] {dag.py:1296} INFO - [DAG TEST] running task <TaskInstance: subprocess_dag.py_func __airflow_temporary_run_2025-08-21T07:52:11.207333+00:00__ [None]>
2025-08-21 07:52:12 [debug    ] Starting task instance run     hostname=e2cc5b7dc023 pid=450 ti_id=0198cb9d-28d2-7800-82b0-7161dc72f022 unixname=root
2025-08-21 07:52:12 [debug    ] Retrieved task instance details dag_id=subprocess_dag state=queued task_id=py_func ti_id=0198cb9d-28d2-7800-82b0-7161dc72f022
2025-08-21 07:52:12 [info     ] Task started                   hostname=e2cc5b7dc023 previous_state=queued ti_id=0198cb9d-28d2-7800-82b0-7161dc72f022
2025-08-21 07:52:12 [info     ] Task instance state updated    rows_affected=1 ti_id=0198cb9d-28d2-7800-82b0-7161dc72f022
[2025-08-21T07:52:12.497+0000] {_client.py:1025} INFO - HTTP Request: PATCH http://in-process.invalid./task-instances/0198cb9d-28d2-7800-82b0-7161dc72f022/run "HTTP/1.1 200 OK"
2025-08-21 07:52:12 [debug    ] Sending request                msg=SetRenderedFields(rendered_fields={'templates_dict': None, 'op_args': [], 'op_kwargs': {}}, type='SetRenderedFields')
2025-08-21 07:52:12 [debug    ] Received message from task runner [supervisor] msg=SetRenderedFields(rendered_fields={'templates_dict': None, 'op_args': [], 'op_kwargs': {}}, type='SetRenderedFields')
2025-08-21 07:52:12 [info     ] Updating RenderedTaskInstanceFields field_count=3 ti_id=0198cb9d-28d2-7800-82b0-7161dc72f022
2025-08-21 07:52:12 [debug    ] RenderedTaskInstanceFields updated successfully ti_id=0198cb9d-28d2-7800-82b0-7161dc72f022
[2025-08-21T07:52:12.509+0000] {_client.py:1025} INFO - HTTP Request: PUT http://in-process.invalid./task-instances/0198cb9d-28d2-7800-82b0-7161dc72f022/rtif "HTTP/1.1 201 Created"
2025-08-21 07:52:12 [debug    ] Sending request                msg=MaskSecret(value='***', name=None, type='MaskSecret')
2025-08-21 07:52:12 [debug    ] Received message from task runner (body omitted) [supervisor] msg=<class 'airflow.sdk.execution_time.comms.MaskSecret'>
[2025-08-21T07:52:12.513+0000] {subprocess_mask_var.py:13} INFO - ***
via print ***
[2025-08-21T07:52:12.513+0000] {python.py:218} INFO - Done. Returned value was: None
2025-08-21 07:52:12 [debug    ] Sending request                msg=SucceedTask(state='success', end_date=datetime.datetime(2025, 8, 21, 7, 52, 12, 513546, tzinfo=datetime.timezone.utc), task_outlets=[], outlet_events=[], rendered_map_index=None, type='SucceedTask')
2025-08-21 07:52:12 [debug    ] Received message from task runner [supervisor] msg=SucceedTask(state='success', end_date=datetime.datetime(2025, 8, 21, 7, 52, 12, 513546, tzinfo=datetime.timezone.utc), task_outlets=[], outlet_events=[], rendered_map_index=None, type='SucceedTask')
2025-08-21 07:52:12 [debug    ] Updating task instance state   new_state=success ti_id=0198cb9d-28d2-7800-82b0-7161dc72f022
2025-08-21 07:52:12 [debug    ] Retrieved current task instance state max_tries=0 previous_state=running ti_id=0198cb9d-28d2-7800-82b0-7161dc72f022 try_number=0
2025-08-21 07:52:12 [info     ] Task instance state updated    new_state=success rows_affected=1 ti_id=0198cb9d-28d2-7800-82b0-7161dc72f022
[2025-08-21T07:52:12.522+0000] {_client.py:1025} INFO - HTTP Request: PATCH http://in-process.invalid./task-instances/0198cb9d-28d2-7800-82b0-7161dc72f022/state "HTTP/1.1 204 No Content"
2025-08-21 07:52:12 [debug    ] Running finalizers             [task] ti=RuntimeTaskInstance(id=UUID('0198cb9d-28d2-7800-82b0-7161dc72f022'), task_id='py_func', dag_id='subprocess_dag', run_id='__airflow_temporary_run_2025-08-21T07:52:11.207333+00:00__', try_number=0, dag_version_id=UUID('0198cb9b-d3d6-74f3-a561-083d897ed8cd'), map_index=-1, hostname='e2cc5b7dc023', context_carrier=None, task=<Task(PythonOperator): py_func>, max_tries=0, start_date=datetime.datetime(2025, 8, 21, 7, 52, 11, 880733, tzinfo=datetime.timezone.utc), end_date=datetime.datetime(2025, 8, 21, 7, 52, 12, 513546, tzinfo=datetime.timezone.utc), state=<TaskInstanceState.SUCCESS: 'success'>, is_mapped=False, rendered_map_index=None, log_url=None)

This is specifically to test the recent issue: 1f4c55c

  1. Running a trigger with a dag that will log using dag processor, root logger, structlog etc

Trigger:

from __future__ import annotations

import asyncio
import logging

import structlog

from airflow.sdk import Connection, Variable
from airflow.sdk.log import mask_secret
from airflow.triggers.base import BaseTrigger, TriggerEvent


print(f"{__file__=} loaded")


class CustomTrigger(BaseTrigger):
    async def run(self, **args):
        from asgiref.sync import sync_to_async

        await sync_to_async(Variable.set)("my_api_key", "password1")
        x = await sync_to_async(Variable.get)("my_api_key")

        logging.getLogger(__name__).info("my_api_key=%s", x)
        secret = "some-secret-value"
        await sync_to_async(mask_secret)(secret)
        await sync_to_async(mask_secret)("some-secret-value")

        logging.getLogger(__name__).info("after manual mask %s", secret)

        await structlog.get_logger().ainfo("Testing structlog", val=secret, api_key="abcdef", x=x)

        yield TriggerEvent({"Hi": "from trigger"})

    def serialize(self):
        return (
            f"{type(self).__module__}.{type(self).__qualname__}",
            {},
        )

DAG:

from airflow.exceptions import AirflowRescheduleException, TaskDeferred
from airflow.sdk import dag, task
from airflow.sdk import Variable
import logging

x = Variable.get("toplevel_api_key", default="secret_api")
print(f"{x=}")
logging.root.info("toplevel=%s", x)

@dag()
def trigger_a_gag():
    @task
    def trigger(event=None) -> None:
        if event:
            print(event)
        else:
            from triggera import CustomTrigger
            raise TaskDeferred(trigger=CustomTrigger(), method_name="execute")

    trigger()


trigger_a_gag()

image

Logs:

[2025-08-21, 13:24:55] INFO - __file__='/files/plugins/triggera.py' loaded: chan="stdout": source="task"
[2025-08-21, 13:24:55] INFO - DAG bundles loaded: dags-folder: source="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-08-21, 13:24:55] INFO - Filling up the DagBag from /files/dags/reschedule-dag.py: source="airflow.models.dagbag.DagBag"
[2025-08-21, 13:24:55] INFO - toplevel=***: source="root"
[2025-08-21, 13:24:55] INFO - x='***': chan="stdout": source="task"
[2025-08-21, 13:24:55] INFO - Pausing task as DEFERRED. : dag_id="trigger_a_gag": task_id="trigger": run_id="manual__2025-08-21T07:54:54.321189+00:00": source="task"
[2025-08-21, 13:24:57] INFO - trigger trigger_a_gag/manual__2025-08-21T07:54:54.321189+00:00/trigger/-1/1 (ID 1) starting
[2025-08-21, 13:24:59] INFO - my_api_key=***: source="triggera"
[2025-08-21, 13:24:59] INFO - after manual mask ***: source="triggera"
[2025-08-21, 13:24:59] INFO - Testing structlog: val="***": api_key="***": x="***"
[2025-08-21, 13:24:59] INFO - Trigger fired event: name="trigger_a_gag/manual__2025-08-21T07:54:54.321189+00:00/trigger/-1/1 (ID 1)": result="TriggerEvent<{'Hi': 'from trigger'}>"
[2025-08-21, 13:24:59] INFO - trigger completed: name="trigger_a_gag/manual__2025-08-21T07:54:54.321189+00:00/trigger/-1/1 (ID 1)"
[2025-08-21, 13:25:00] INFO - __file__='/files/plugins/triggera.py' loaded: chan="stdout": source="task"
[2025-08-21, 13:25:00] INFO - DAG bundles loaded: dags-folder: source="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-08-21, 13:25:00] INFO - Filling up the DagBag from /files/dags/reschedule-dag.py: source="airflow.models.dagbag.DagBag"
[2025-08-21, 13:25:00] INFO - toplevel=***: source="root"
[2025-08-21, 13:25:00] INFO - x='***': chan="stdout": source="task"
[2025-08-21, 13:25:00] ERROR - Task failed with exception: source="task"

TODO next

  • Update occurences of these imports in providers
  • Update these occurrences in docs if any

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

@amoghrajesh
Copy link
Copy Markdown
Contributor Author

@kaxil / @potiuk thanks for your reviews, I have handled all / most of your comments, could you take a look when possible?

@potiuk added the CI job for this too

@amoghrajesh
Copy link
Copy Markdown
Contributor Author

@amoghrajesh amoghrajesh requested review from kaxil and potiuk August 22, 2025 13:41
Copy link
Copy Markdown
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wonderful!

Copy link
Copy Markdown
Member

@kaxil kaxil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some comments to be addressed but high-level is fine

@amoghrajesh
Copy link
Copy Markdown
Contributor Author

Good lord, that was a big one with changes in many areas including precommits etc! Thanks for your reviews @potiuk and @kaxil! The tooling is awesome (even more so with the improvements now) for shared library bootstrapping! Merging this and will be handling the open comments in #54859 and resolve it soon

@amoghrajesh amoghrajesh merged commit 7982542 into apache:main Aug 23, 2025
194 checks passed
@amoghrajesh amoghrajesh deleted the move-secrets-masker-to-shared branch August 23, 2025 05:18
@potiuk
Copy link
Copy Markdown
Member

potiuk commented Aug 23, 2025

Good lord, that was a big one with changes in many areas including precommits etc! Thanks for your reviews @potiuk and @kaxil! The tooling is awesome (even more so with the improvements now) for shared library bootstrapping! Merging this and will be handling the open comments in #54859 and resolve it soon

Indeed. I am very happy with the tooling :). And with #54863 we have small common test framework for the shared distributions as well :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

No open projects

Development

Successfully merging this pull request may close these issues.

Create a shared library for secrets_masker and move the module there

4 participants