From eec9d3634b71c2afe259be6e5c0127a954a77d07 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gonzalo=20Pe=C3=B1a-Castellanos?= Date: Tue, 30 Jun 2026 17:23:09 -0500 Subject: [PATCH] docs(amazon): add S3-compatible remote logging guide MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Gonzalo Peña-Castellanos --- .../logging/s3-compatible-remote-logging.rst | 211 ++++++++++++++++++ .../example_s3_compatible_object_storage.py | 91 ++++++++ 2 files changed, 302 insertions(+) create mode 100644 providers/amazon/docs/logging/s3-compatible-remote-logging.rst create mode 100644 providers/amazon/tests/system/amazon/aws/example_s3_compatible_object_storage.py diff --git a/providers/amazon/docs/logging/s3-compatible-remote-logging.rst b/providers/amazon/docs/logging/s3-compatible-remote-logging.rst new file mode 100644 index 0000000000000..f4983320b4aed --- /dev/null +++ b/providers/amazon/docs/logging/s3-compatible-remote-logging.rst @@ -0,0 +1,211 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _write-logs-s3-compatible: + +Use an S3-compatible object store for Airflow remote task logs +============================================================== + +The Amazon provider talks to any S3-compatible object store, not just Amazon S3. Because the +:ref:`S3 remote task handler ` issues standard S3 API calls, pointing the +``aws`` connection at a custom ``endpoint_url`` makes it write Airflow task logs to that +endpoint with no new provider and no core change. You use the ``s3://`` scheme in +``[logging]`` exactly as you would for Amazon S3, and the same connection also backs +``ObjectStoragePath("s3://...")`` for Dag data. + +Amazon S3 is the baseline. The same steps work against other services that expose an +S3-compatible API, for example Amazon S3, Backblaze B2, Cloudflare R2, and MinIO. The only +per-provider differences are the endpoint URL, the region, and whether path-style addressing +is required. + +This recipe targets Airflow 3.x with ``apache-airflow-providers-amazon``. + +Prerequisites +------------- + +- A bucket for logs (private). The examples use ``$S3_BUCKET_NAME``. +- An access key and secret scoped to that bucket. Prefer a bucket-scoped key over an + account-wide one. +- ``apache-airflow-providers-amazon`` installed. For ``ObjectStoragePath`` you also need the + ``s3fs`` extra: ``pip install 'apache-airflow-providers-amazon[s3fs]'``. + +Every S3-compatible service issues an access key id and a secret access key. Map them onto the +AWS connection fields as follows. + +============================ ================================= ============================================= +S3-compatible value Standardized env var AWS connection field +============================ ================================= ============================================= +Access key id ``S3_ACCESS_KEY_ID`` ``login`` (AWS access key id) +Secret access key ``S3_SECRET_ACCESS_KEY`` ``password`` (AWS secret access key) +Bucket name ``S3_BUCKET_NAME`` used in ``remote_base_log_folder`` +Region ``S3_REGION`` ``extra.region_name`` +S3 endpoint ``S3_ENDPOINT`` ``extra.endpoint_url`` +============================ ================================= ============================================= + +Find the endpoint and region for your bucket in your provider's console or CLI. For Amazon S3 +the endpoint is the default AWS endpoint and you can omit ``endpoint_url`` entirely; for other +S3-compatible services set ``endpoint_url`` to the provider's S3 endpoint, such as +``https://your-s3-endpoint.example.com``. The connection ``extra.endpoint_url`` must include +the ``https://`` scheme. + +Step 1: Create the connection pointing at your endpoint +------------------------------------------------------- + +Create an ``aws`` connection whose ``endpoint_url`` extra is your S3 endpoint. The amazon +provider sends every S3 call to that endpoint instead of the AWS default, which is what makes +the S3 handler talk to your store. For Amazon S3 you can leave ``endpoint_url`` unset and the +provider uses the default AWS endpoint. + +Using the Airflow CLI with an environment-variable connection (no secrets on the command +line): + +.. code-block:: bash + + export AIRFLOW_CONN_AWS_S3='{ + "conn_type": "aws", + "login": "'"$S3_ACCESS_KEY_ID"'", + "password": "'"$S3_SECRET_ACCESS_KEY"'", + "extra": { + "endpoint_url": "'"$S3_ENDPOINT"'", + "region_name": "'"$S3_REGION"'", + "config_kwargs": {"s3": {"addressing_style": "path"}} + } + }' + +The ``config_kwargs`` ``addressing_style: path`` selects path-style addressing +(``endpoint/bucket/key``). Amazon S3 accepts both styles; several S3-compatible services expect +path-style addressing, so set it when your provider requires it. + +The equivalent JSON when you create the connection in the UI (``Admin -> Connections``, +connection type ``Amazon Web Services``) or store it in a secrets backend: + +.. code-block:: json + + { + "conn_type": "aws", + "login": "", + "password": "", + "extra": { + "endpoint_url": "https://your-s3-endpoint.example.com", + "region_name": "us-east-1", + "config_kwargs": {"s3": {"addressing_style": "path"}} + } + } + +Never hardcode the secret access key in a Dag, ``airflow.cfg``, or version control. Read it +from the environment or a secrets backend. + +Step 2: Enable remote logging to the bucket +------------------------------------------- + +Configure the ``[logging]`` section of ``airflow.cfg`` (or the equivalent +``AIRFLOW__LOGGING__*`` environment variables) so Airflow uploads task logs to the bucket +through the connection from Step 1: + +.. code-block:: ini + + [logging] + remote_logging = True + remote_base_log_folder = s3:///logs + remote_log_conn_id = aws_s3 + # Server-side encryption headers are an Amazon S3 feature; leave this off for stores that + # do not support them. + encrypt_s3_logs = False + +The ``s3://`` scheme routes through the same S3 task handler used for Amazon S3 +(``S3TaskHandler``), which resolves ``S3Hook(aws_conn_id="aws_s3")`` and therefore inherits the +endpoint from the connection extra. Restart the scheduler, the API server, the triggerer, and +the workers so they pick up the new configuration. + +As environment variables: + +.. code-block:: bash + + export AIRFLOW__LOGGING__REMOTE_LOGGING=True + export AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER="s3://${S3_BUCKET_NAME}/logs" + export AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID=aws_s3 + +The access key needs the equivalent of ``s3:ListBucket`` on the bucket and ``s3:GetObject`` / +``s3:PutObject`` on the log prefix. A key scoped to the bucket with read and write capabilities +covers this. + +Step 3: Verify +-------------- + +- Trigger any example Dag and let a task finish. +- Confirm the objects appear under ``s3:///logs/`` in your provider's console + or with any S3 client (for example ``aws s3 ls s3:///logs/ --endpoint-url + "$S3_ENDPOINT"``). +- Open the task log in the Airflow UI. The log is served from the remote store; a banner notes + that the log was read from the remote location. + +Reusing the connection for Dag data +----------------------------------- + +The same ``aws_s3`` connection backs ``ObjectStoragePath`` for reading and writing Dag data, +because ``ObjectStoragePath("s3://...")`` builds an ``s3fs`` filesystem from the connection and +inherits the same ``endpoint_url``: + +.. code-block:: python + + from airflow.sdk import ObjectStoragePath + + base = ObjectStoragePath("s3://aws_s3@my-bucket/airflow-demo/") + +See the example Dag ``example_s3_compatible_object_storage`` for an input to transform to +output pipeline against an S3-compatible store. + +Troubleshooting +--------------- + +- ``SignatureDoesNotMatch`` or ``403``: re-check that ``endpoint_url`` includes ``https://``, + that ``region_name`` matches the bucket's region, and that ``login`` / ``password`` hold the + access key id and secret access key for a key scoped to the bucket. +- Logs stay local only: confirm ``remote_logging = True`` and that the components were + restarted. ``remote_base_log_folder`` must start with ``s3://``. +- ``ImportError`` for ``s3fs`` when using ``ObjectStoragePath``: install the extra with + ``pip install 'apache-airflow-providers-amazon[s3fs]'``. + +Trigger Dags from bucket event notifications +-------------------------------------------- + +Beyond logs and data, the same bucket can drive upload-triggered pipelines. Many S3-compatible +services can send a webhook when an object is created or deleted in a bucket, and Airflow +exposes a REST endpoint that creates a Dag run. Wiring the two together turns an upload into an +Airflow Dag run, with no polling sensor. + +The flow is: a client uploads an object to the bucket; the object store posts an +event-notification payload to an HTTPS webhook you control (for example a small function or API +gateway); that webhook authenticates to the Airflow REST API and triggers the Dag, passing the +object key in the run configuration. The endpoint is ``POST /api/v2/dags/{dag_id}/dagRuns`` +with a JSON body that accepts ``logical_date``, an optional ``dag_run_id``, and a ``conf`` +object. Put the object key from the notification into ``conf`` so the Dag knows which object to +process: + +.. code-block:: bash + + curl -X POST "$AIRFLOW_API/api/v2/dags/example_s3_compatible_object_storage/dagRuns" \ + -H "Authorization: Bearer $AIRFLOW_JWT" \ + -H "Content-Type: application/json" \ + -d '{"logical_date": "2026-01-01T00:00:00Z", "conf": {"object_key": "incoming/file.txt"}}' + +The triggered Dag reads ``conf["object_key"]`` and processes that object straight from the +store with ``ObjectStoragePath(f"s3://aws_s3@{bucket}/{object_key}")``, reusing the connection +from Step 1. Keep the webhook thin: validate the notification, mint or hold a short-lived +Airflow API token, and forward only the object key. This pattern fits ingestion pipelines such +as transcode on upload or index on upload, where work should start the moment data lands in the +bucket. diff --git a/providers/amazon/tests/system/amazon/aws/example_s3_compatible_object_storage.py b/providers/amazon/tests/system/amazon/aws/example_s3_compatible_object_storage.py new file mode 100644 index 0000000000000..83c7f9ba56cdb --- /dev/null +++ b/providers/amazon/tests/system/amazon/aws/example_s3_compatible_object_storage.py @@ -0,0 +1,91 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Example Dag: input -> transform -> output on an S3-compatible object store via ``ObjectStoragePath``. + +The Amazon provider talks to any S3-compatible object store, so ``ObjectStoragePath("s3://...")`` +reaches the store through the amazon provider once the ``aws`` connection points at its S3 +endpoint. Amazon S3 is the baseline; the same code works against other S3-compatible services +(for example Amazon S3, Backblaze B2, Cloudflare R2, and MinIO). See the recipe "Use an +S3-compatible object store for Airflow remote task logs" for connection and ``[logging]`` setup. + +Set up an ``aws`` connection (default id ``aws_s3``) whose ``extra`` includes the S3 +``endpoint_url`` and ``region_name``. The bucket name comes from ``S3_BUCKET_NAME``. + +Requires the s3fs extra: ``pip install 'apache-airflow-providers-amazon[s3fs]'``. +""" + +from __future__ import annotations + +import os +from datetime import datetime + +from airflow.sdk import ObjectStoragePath, dag, task + +DAG_ID = "example_s3_compatible_object_storage" + +# Connection id and bucket are read from the environment so the example carries no secrets. +S3_CONN_ID = os.environ.get("S3_CONN_ID", "aws_s3") +S3_BUCKET_NAME = os.environ.get("S3_BUCKET_NAME", "my-bucket") + +# ObjectStoragePath resolves the connection lazily, so building it at module scope is safe. +base = ObjectStoragePath(f"s3://{S3_CONN_ID}@{S3_BUCKET_NAME}/airflow-demo") + + +@dag( + schedule=None, + start_date=datetime(2021, 1, 1), + catchup=False, + tags=["example", "s3-compatible", "object-storage"], +) +def example_s3_compatible_object_storage(): + """Write input to the object store, transform it, and write the output back.""" + + @task + def input_to_store() -> str: + """Write a raw input object to the store and return its path.""" + base.mkdir(exist_ok=True) + src = base / "input.txt" + src.write_text("s3\ncompatible\nobject\nstorage\n") + return str(src) + + @task + def transform(src_path: str) -> str: + """Read the input from the store, uppercase it, and write the result back.""" + src = ObjectStoragePath(src_path) + text = src.read_text() + dst = base / "output.txt" + dst.write_text(text.upper()) + return str(dst) + + @task + def output_from_store(dst_path: str) -> None: + """Read the transformed object back from the store to confirm the round-trip.""" + dst = ObjectStoragePath(dst_path) + print(dst.read_text()) + + output_from_store(transform(input_to_store())) + + +dag = example_s3_compatible_object_storage() + + +from tests_common.test_utils.system_tests import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: contributing-docs/testing/system_tests.rst) +test_run = get_test_run(dag)