diff --git a/langfuse/_client/client.py b/langfuse/_client/client.py index a8781c9b9..2df3deca5 100644 --- a/langfuse/_client/client.py +++ b/langfuse/_client/client.py @@ -155,7 +155,8 @@ class Langfuse: base_url (Optional[str]): The Langfuse API base URL. Defaults to "https://cloud.langfuse.com". Can also be set via LANGFUSE_BASE_URL environment variable. host (Optional[str]): Deprecated. Use base_url instead. The Langfuse API host URL. Defaults to "https://cloud.langfuse.com". timeout (Optional[int]): Timeout in seconds for API requests. Defaults to 5 seconds. - httpx_client (Optional[httpx.Client]): Custom httpx client for making non-tracing HTTP requests. If not provided, a default client will be created. + httpx_client (Optional[httpx.Client]): Custom synchronous httpx client for making non-tracing HTTP requests. If not provided, a default client will be created. + async_httpx_client (Optional[httpx.AsyncClient]): Custom asynchronous httpx client for `client.async_api`. If not provided, a default async client will be created. debug (bool): Enable debug logging. Defaults to False. Can also be set via LANGFUSE_DEBUG environment variable. tracing_enabled (Optional[bool]): Enable or disable tracing. Defaults to True. Can also be set via LANGFUSE_TRACING_ENABLED environment variable. flush_at (Optional[int]): Number of spans to batch before sending to the API. Defaults to 512. Can also be set via LANGFUSE_FLUSH_AT environment variable. @@ -179,7 +180,7 @@ class Langfuse: ) ``` should_export_span (Optional[Callable[[ReadableSpan], bool]]): Callback to decide whether to export a span. If omitted, Langfuse uses the default filter (Langfuse SDK spans, spans with `gen_ai.*` attributes, and known LLM instrumentation scopes). - additional_headers (Optional[Dict[str, str]]): Additional headers to include in all API requests and OTLPSpanExporter requests. These headers will be merged with default headers. Note: If httpx_client is provided, additional_headers must be set directly on your custom httpx_client as well. + additional_headers (Optional[Dict[str, str]]): Additional headers to include in all API requests and OTLPSpanExporter requests. These headers will be merged with default headers. Note: If `httpx_client` or `async_httpx_client` is provided, `additional_headers` must be set directly on your custom client as well. tracer_provider(Optional[TracerProvider]): OpenTelemetry TracerProvider to use for Langfuse. This can be useful to set to have disconnected tracing between Langfuse and other OpenTelemetry-span emitting libraries. Note: To track active spans, the context is still shared between TracerProviders. This may lead to broken trace trees. Example: @@ -231,6 +232,7 @@ def __init__( host: Optional[str] = None, timeout: Optional[int] = None, httpx_client: Optional[httpx.Client] = None, + async_httpx_client: Optional[httpx.AsyncClient] = None, debug: bool = False, tracing_enabled: Optional[bool] = True, flush_at: Optional[int] = None, @@ -332,6 +334,7 @@ def __init__( flush_at=flush_at, flush_interval=flush_interval, httpx_client=httpx_client, + async_httpx_client=async_httpx_client, media_upload_thread_count=media_upload_thread_count, sample_rate=sample_rate, mask=mask, diff --git a/langfuse/_client/get_client.py b/langfuse/_client/get_client.py index dd2ee4a29..f6ee16810 100644 --- a/langfuse/_client/get_client.py +++ b/langfuse/_client/get_client.py @@ -55,6 +55,7 @@ def _create_client_from_instance( additional_headers=instance.additional_headers, tracer_provider=instance.tracer_provider, httpx_client=instance.httpx_client, + async_httpx_client=instance.async_httpx_client, ) diff --git a/langfuse/_client/resource_manager.py b/langfuse/_client/resource_manager.py index 45c90ad66..efbbbd3e9 100644 --- a/langfuse/_client/resource_manager.py +++ b/langfuse/_client/resource_manager.py @@ -90,6 +90,7 @@ def __new__( flush_at: Optional[int] = None, flush_interval: Optional[float] = None, httpx_client: Optional[httpx.Client] = None, + async_httpx_client: Optional[httpx.AsyncClient] = None, media_upload_thread_count: Optional[int] = None, sample_rate: Optional[float] = None, mask: Optional[MaskFunction] = None, @@ -123,6 +124,7 @@ def __new__( flush_at=flush_at, flush_interval=flush_interval, httpx_client=httpx_client, + async_httpx_client=async_httpx_client, media_upload_thread_count=media_upload_thread_count, sample_rate=sample_rate, mask=mask, @@ -152,6 +154,7 @@ def _initialize_instance( flush_interval: Optional[float] = None, media_upload_thread_count: Optional[int] = None, httpx_client: Optional[httpx.Client] = None, + async_httpx_client: Optional[httpx.AsyncClient] = None, sample_rate: Optional[float] = None, mask: Optional[MaskFunction] = None, tracing_enabled: bool = True, @@ -218,6 +221,15 @@ def _initialize_instance( client_headers = additional_headers if additional_headers else {} self.httpx_client = httpx.Client(timeout=timeout, headers=client_headers) + if async_httpx_client is not None: + self.async_httpx_client = async_httpx_client + else: + async_client_headers = additional_headers if additional_headers else {} + self.async_httpx_client = httpx.AsyncClient( + timeout=timeout, + headers=async_client_headers, + ) + self.api = LangfuseAPI( base_url=base_url, username=self.public_key, @@ -235,6 +247,7 @@ def _initialize_instance( x_langfuse_sdk_name="python", x_langfuse_sdk_version=langfuse_version, x_langfuse_public_key=self.public_key, + httpx_client=self.async_httpx_client, timeout=timeout, ) score_ingestion_client = LangfuseClient( diff --git a/tests/test_additional_headers_simple.py b/tests/test_additional_headers_simple.py index 47cc765bd..7de77e019 100644 --- a/tests/test_additional_headers_simple.py +++ b/tests/test_additional_headers_simple.py @@ -3,6 +3,8 @@ This module tests that additional headers are properly configured in the HTTP clients. """ +import asyncio + import httpx from langfuse._client.client import Langfuse @@ -115,6 +117,28 @@ def test_media_manager_uses_custom_httpx_client(self): assert langfuse._resources is not None assert langfuse._resources._media_manager._httpx_client is custom_client + def test_async_api_uses_custom_async_httpx_client(self): + """Test that async_api reuses the configured custom async httpx client.""" + custom_async_client = httpx.AsyncClient() + + try: + langfuse = Langfuse( + public_key="test-public-key", + secret_key="test-secret-key", + host="https://mock-host.com", + async_httpx_client=custom_async_client, + tracing_enabled=False, + ) + + assert langfuse._resources is not None + assert langfuse._resources.async_httpx_client is custom_async_client + assert ( + langfuse.async_api._client_wrapper.httpx_client.httpx_client + is custom_async_client + ) + finally: + asyncio.run(custom_async_client.aclose()) + def test_none_additional_headers_works(self): """Test that passing None for additional_headers works without errors.""" langfuse = Langfuse( diff --git a/tests/test_resource_manager.py b/tests/test_resource_manager.py index 72f9f7d7e..8a7f8c68b 100644 --- a/tests/test_resource_manager.py +++ b/tests/test_resource_manager.py @@ -1,5 +1,9 @@ """Test the LangfuseResourceManager and get_client() function.""" +import asyncio + +import httpx + from langfuse import Langfuse from langfuse._client.get_client import get_client from langfuse._client.resource_manager import LangfuseResourceManager @@ -94,3 +98,30 @@ def should_export_b(span): client_a.shutdown() client_b.shutdown() + + +def test_get_client_preserves_custom_async_httpx_client(): + """Test that get_client() preserves the custom async httpx client.""" + with LangfuseResourceManager._lock: + LangfuseResourceManager._instances.clear() + + custom_async_client = httpx.AsyncClient() + + try: + Langfuse( + public_key="pk-async-client", + secret_key="sk-async-client", + async_httpx_client=custom_async_client, + tracing_enabled=False, + ) + retrieved_client = get_client() + + assert retrieved_client._resources is not None + assert retrieved_client._resources.async_httpx_client is custom_async_client + assert ( + retrieved_client.async_api._client_wrapper.httpx_client.httpx_client + is custom_async_client + ) + finally: + LangfuseResourceManager.reset() + asyncio.run(custom_async_client.aclose())