diff --git a/.env.example b/.env.example index b650308f..727672a7 100644 --- a/.env.example +++ b/.env.example @@ -5,6 +5,11 @@ POSTGRES_URL=POSTGRES_URL=postgresql://localhost:5432/apollo_dev SENTRY_DSN=YOUR-API-KEY-HERE GITHUB_TOKEN=KEY +# Langfuse observability +LANGFUSE_SECRET_KEY=sk-lf-... +LANGFUSE_PUBLIC_KEY=pk-lf-... +LANGFUSE_BASE_URL=https://cloud.langfuse.com + # HF_ACCESS_TOKEN=hf_YOUR-API-KEY-HERE # llama2 base # ZILLIZ_URI = https://in01-XXXXXXXXXXXXX.aws-us-west-2.vectordb.zillizcloud.com:XXXXX # ZILLIZ_TOKEN =db_admin:password (or ApiKey) diff --git a/poetry.lock b/poetry.lock index 51a3475a..d627b343 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 2.2.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.3.2 and should not be changed by hand. [[package]] name = "aiohappyeyeballs" @@ -221,6 +221,18 @@ docs = ["cogapp", "furo", "myst-parser", "sphinx", "sphinx-notfound-page", "sphi tests = ["cloudpickle ; platform_python_implementation == \"CPython\"", "hypothesis", "mypy (>=1.11.1) ; platform_python_implementation == \"CPython\" and python_version >= \"3.10\"", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins ; platform_python_implementation == \"CPython\" and python_version >= \"3.10\"", "pytest-xdist[psutil]"] tests-mypy = ["mypy (>=1.11.1) ; platform_python_implementation == \"CPython\" and python_version >= \"3.10\"", "pytest-mypy-plugins ; platform_python_implementation == \"CPython\" and python_version >= \"3.10\""] +[[package]] +name = "backoff" +version = "2.2.1" +description = "Function decoration for backoff and retry" +optional = false +python-versions = ">=3.7,<4.0" +groups = ["main"] +files = [ + {file = "backoff-2.2.1-py3-none-any.whl", hash = "sha256:63579f9a0628e06278f7e47b7d7d5b6ce20dc65c5e96a6f3ca99a6adca0396e8"}, + {file = "backoff-2.2.1.tar.gz", hash = "sha256:03f829f5bb1923180821643f8753b0502c3b682293992485b0eef2807afa5cba"}, +] + [[package]] name = "black" version = "26.3.1" @@ -569,7 +581,7 @@ files = [ {file = "colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6"}, {file = "colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44"}, ] -markers = {main = "platform_system == \"Windows\" or sys_platform == \"win32\"", dev = "sys_platform == \"win32\""} +markers = {main = "sys_platform == \"win32\" or platform_system == \"Windows\"", dev = "sys_platform == \"win32\""} [[package]] name = "confection" @@ -924,6 +936,24 @@ test-downstream = ["aiobotocore (>=2.5.4,<3.0.0)", "dask[dataframe,test]", "moto test-full = ["adlfs", "aiohttp (!=4.0.0a0,!=4.0.0a1)", "cloudpickle", "dask", "distributed", "dropbox", "dropboxdrivefs", "fastparquet", "fusepy", "gcsfs", "jinja2", "kerchunk", "libarchive-c", "lz4", "notebook", "numpy", "ocifs", "pandas", "panel", "paramiko", "pyarrow", "pyarrow (>=1)", "pyftpdlib", "pygit2", "pytest", "pytest-asyncio (!=0.22.0)", "pytest-benchmark", "pytest-cov", "pytest-mock", "pytest-recording", "pytest-rerunfailures", "python-snappy", "requests", "smbprotocol", "tqdm", "urllib3", "zarr", "zstandard"] tqdm = ["tqdm"] +[[package]] +name = "googleapis-common-protos" +version = "1.73.0" +description = "Common protobufs used in Google APIs" +optional = false +python-versions = ">=3.7" +groups = ["main"] +files = [ + {file = "googleapis_common_protos-1.73.0-py3-none-any.whl", hash = "sha256:dfdaaa2e860f242046be561e6d6cb5c5f1541ae02cfbcb034371aadb2942b4e8"}, + {file = "googleapis_common_protos-1.73.0.tar.gz", hash = "sha256:778d07cd4fbeff84c6f7c72102f0daf98fa2bfd3fa8bea426edc545588da0b5a"}, +] + +[package.dependencies] +protobuf = ">=3.20.2,<4.21.1 || >4.21.1,<4.21.2 || >4.21.2,<4.21.3 || >4.21.3,<4.21.4 || >4.21.4,<4.21.5 || >4.21.5,<7.0.0" + +[package.extras] +grpc = ["grpcio (>=1.44.0,<2.0.0)"] + [[package]] name = "greenlet" version = "3.2.3" @@ -1208,6 +1238,30 @@ files = [ [package.extras] all = ["flake8 (>=7.1.1)", "mypy (>=1.11.2)", "pytest (>=8.3.2)", "ruff (>=0.6.2)"] +[[package]] +name = "importlib-metadata" +version = "8.7.1" +description = "Read metadata from Python packages" +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "importlib_metadata-8.7.1-py3-none-any.whl", hash = "sha256:5a1f80bf1daa489495071efbb095d75a634cf28a8bc299581244063b53176151"}, + {file = "importlib_metadata-8.7.1.tar.gz", hash = "sha256:49fef1ae6440c182052f407c8d34a68f72efc36db9ca90dc0113398f2fdde8bb"}, +] + +[package.dependencies] +zipp = ">=3.20" + +[package.extras] +check = ["pytest-checkdocs (>=2.4)", "pytest-ruff (>=0.2.1) ; sys_platform != \"cygwin\""] +cover = ["pytest-cov"] +doc = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-lint"] +enabler = ["pytest-enabler (>=3.4)"] +perf = ["ipython"] +test = ["flufl.flake8", "jaraco.test (>=5.4)", "packaging", "pyfakefs", "pytest (>=6,!=8.1.*)", "pytest-perf (>=0.9.2)"] +type = ["mypy (<1.19) ; platform_python_implementation == \"PyPy\"", "pytest-mypy (>=1.0.1)"] + [[package]] name = "iniconfig" version = "2.1.0" @@ -1546,6 +1600,29 @@ language-data = ">=1.2" build = ["build", "twine"] test = ["pytest", "pytest-cov"] +[[package]] +name = "langfuse" +version = "4.0.1" +description = "A client library for accessing langfuse" +optional = false +python-versions = "<4.0,>=3.10" +groups = ["main"] +files = [ + {file = "langfuse-4.0.1-py3-none-any.whl", hash = "sha256:e22f49ea31304f97fc31a97c014ba63baa8802d9568295d54f06b00b43c30524"}, + {file = "langfuse-4.0.1.tar.gz", hash = "sha256:40a6daf3ab505945c314246d5b577d48fcfde0a47e8c05267ea6bd494ae9608e"}, +] + +[package.dependencies] +backoff = ">=1.10.0" +httpx = ">=0.15.4,<1.0" +openai = ">=0.27.8" +opentelemetry-api = ">=1.33.1,<2.0.0" +opentelemetry-exporter-otlp-proto-http = ">=1.33.1,<2.0.0" +opentelemetry-sdk = ">=1.33.1,<2.0.0" +packaging = ">=23.2,<26.0" +pydantic = ">=2,<3" +wrapt = ">=1.14,<2.0" + [[package]] name = "langsmith" version = "0.4.1" @@ -2379,6 +2456,164 @@ datalib = ["numpy (>=1)", "pandas (>=1.2.3)", "pandas-stubs (>=1.1.0.11)"] realtime = ["websockets (>=13,<16)"] voice-helpers = ["numpy (>=2.0.2)", "sounddevice (>=0.5.1)"] +[[package]] +name = "opentelemetry-api" +version = "1.40.0" +description = "OpenTelemetry Python API" +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "opentelemetry_api-1.40.0-py3-none-any.whl", hash = "sha256:82dd69331ae74b06f6a874704be0cfaa49a1650e1537d4a813b86ecef7d0ecf9"}, + {file = "opentelemetry_api-1.40.0.tar.gz", hash = "sha256:159be641c0b04d11e9ecd576906462773eb97ae1b657730f0ecf64d32071569f"}, +] + +[package.dependencies] +importlib-metadata = ">=6.0,<8.8.0" +typing-extensions = ">=4.5.0" + +[[package]] +name = "opentelemetry-exporter-otlp-proto-common" +version = "1.40.0" +description = "OpenTelemetry Protobuf encoding" +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "opentelemetry_exporter_otlp_proto_common-1.40.0-py3-none-any.whl", hash = "sha256:7081ff453835a82417bf38dccf122c827c3cbc94f2079b03bba02a3165f25149"}, + {file = "opentelemetry_exporter_otlp_proto_common-1.40.0.tar.gz", hash = "sha256:1cbee86a4064790b362a86601ee7934f368b81cd4cc2f2e163902a6e7818a0fa"}, +] + +[package.dependencies] +opentelemetry-proto = "1.40.0" + +[[package]] +name = "opentelemetry-exporter-otlp-proto-http" +version = "1.40.0" +description = "OpenTelemetry Collector Protobuf over HTTP Exporter" +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "opentelemetry_exporter_otlp_proto_http-1.40.0-py3-none-any.whl", hash = "sha256:a8d1dab28f504c5d96577d6509f80a8150e44e8f45f82cdbe0e34c99ab040069"}, + {file = "opentelemetry_exporter_otlp_proto_http-1.40.0.tar.gz", hash = "sha256:db48f5e0f33217588bbc00274a31517ba830da576e59503507c839b38fa0869c"}, +] + +[package.dependencies] +googleapis-common-protos = ">=1.52,<2.0" +opentelemetry-api = ">=1.15,<2.0" +opentelemetry-exporter-otlp-proto-common = "1.40.0" +opentelemetry-proto = "1.40.0" +opentelemetry-sdk = ">=1.40.0,<1.41.0" +requests = ">=2.7,<3.0" +typing-extensions = ">=4.5.0" + +[package.extras] +gcp-auth = ["opentelemetry-exporter-credential-provider-gcp (>=0.59b0)"] + +[[package]] +name = "opentelemetry-instrumentation" +version = "0.61b0" +description = "Instrumentation Tools & Auto Instrumentation for OpenTelemetry Python" +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "opentelemetry_instrumentation-0.61b0-py3-none-any.whl", hash = "sha256:92a93a280e69788e8f88391247cc530fd81f16f2b011979d4d6398f805cfbc63"}, + {file = "opentelemetry_instrumentation-0.61b0.tar.gz", hash = "sha256:cb21b48db738c9de196eba6b805b4ff9de3b7f187e4bbf9a466fa170514f1fc7"}, +] + +[package.dependencies] +opentelemetry-api = ">=1.4,<2.0" +opentelemetry-semantic-conventions = "0.61b0" +packaging = ">=18.0" +wrapt = ">=1.0.0,<2.0.0" + +[[package]] +name = "opentelemetry-instrumentation-anthropic" +version = "0.53.3" +description = "OpenTelemetry Anthropic instrumentation" +optional = false +python-versions = "<4,>=3.10" +groups = ["main"] +files = [ + {file = "opentelemetry_instrumentation_anthropic-0.53.3-py3-none-any.whl", hash = "sha256:f0ab1284783e7020316d03c42c1ed9cc07d62f8daffd5acfe92f3d59f29be87c"}, + {file = "opentelemetry_instrumentation_anthropic-0.53.3.tar.gz", hash = "sha256:dffdb91f9b671aa42ab210bef888d9772d12a08136512123c5d40c1035e74c92"}, +] + +[package.dependencies] +opentelemetry-api = ">=1.38.0,<2" +opentelemetry-instrumentation = ">=0.59b0" +opentelemetry-semantic-conventions = ">=0.59b0" +opentelemetry-semantic-conventions-ai = ">=0.4.14,<0.5.0" + +[package.extras] +instruments = ["anthropic"] + +[[package]] +name = "opentelemetry-proto" +version = "1.40.0" +description = "OpenTelemetry Python Proto" +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "opentelemetry_proto-1.40.0-py3-none-any.whl", hash = "sha256:266c4385d88923a23d63e353e9761af0f47a6ed0d486979777fe4de59dc9b25f"}, + {file = "opentelemetry_proto-1.40.0.tar.gz", hash = "sha256:03f639ca129ba513f5819810f5b1f42bcb371391405d99c168fe6937c62febcd"}, +] + +[package.dependencies] +protobuf = ">=5.0,<7.0" + +[[package]] +name = "opentelemetry-sdk" +version = "1.40.0" +description = "OpenTelemetry Python SDK" +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "opentelemetry_sdk-1.40.0-py3-none-any.whl", hash = "sha256:787d2154a71f4b3d81f20524a8ce061b7db667d24e46753f32a7bc48f1c1f3f1"}, + {file = "opentelemetry_sdk-1.40.0.tar.gz", hash = "sha256:18e9f5ec20d859d268c7cb3c5198c8d105d073714db3de50b593b8c1345a48f2"}, +] + +[package.dependencies] +opentelemetry-api = "1.40.0" +opentelemetry-semantic-conventions = "0.61b0" +typing-extensions = ">=4.5.0" + +[[package]] +name = "opentelemetry-semantic-conventions" +version = "0.61b0" +description = "OpenTelemetry Semantic Conventions" +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "opentelemetry_semantic_conventions-0.61b0-py3-none-any.whl", hash = "sha256:fa530a96be229795f8cef353739b618148b0fe2b4b3f005e60e262926c4d38e2"}, + {file = "opentelemetry_semantic_conventions-0.61b0.tar.gz", hash = "sha256:072f65473c5d7c6dc0355b27d6c9d1a679d63b6d4b4b16a9773062cb7e31192a"}, +] + +[package.dependencies] +opentelemetry-api = "1.40.0" +typing-extensions = ">=4.5.0" + +[[package]] +name = "opentelemetry-semantic-conventions-ai" +version = "0.4.16" +description = "OpenTelemetry Semantic Conventions Extension for Large Language Models" +optional = false +python-versions = "<4,>=3.9" +groups = ["main"] +files = [ + {file = "opentelemetry_semantic_conventions_ai-0.4.16-py3-none-any.whl", hash = "sha256:d5ddd0df387b969da82e3e0a8b7415e91d2fc7ce13de7efc2690a7939932b2e0"}, + {file = "opentelemetry_semantic_conventions_ai-0.4.16.tar.gz", hash = "sha256:572eb878d8b81e50f1e53d2a5c1b441e7d34918ee01c846ff62485204d660c22"}, +] + +[package.dependencies] +opentelemetry-sdk = ">=1.38.0,<2" +opentelemetry-semantic-conventions = ">=0.59b0" + [[package]] name = "orjson" version = "3.11.6" @@ -4553,7 +4788,7 @@ version = "5.12.0" description = "Ultra fast JSON encoder and decoder for Python" optional = false python-versions = ">=3.10" -groups = ["main", "dev"] +groups = ["main"] files = [ {file = "ujson-5.12.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:38051f36423f084b909aaadb3b41c9c6a2958e86956ba21a8489636911e87504"}, {file = "ujson-5.12.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:457fabc2700a8e6ddb85bc5a1d30d3345fe0d3ec3ee8161a4e032ec585801dfa"}, @@ -5051,6 +5286,26 @@ idna = ">=2.0" multidict = ">=4.0" propcache = ">=0.2.1" +[[package]] +name = "zipp" +version = "3.23.0" +description = "Backport of pathlib-compatible object wrapper for zip files" +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "zipp-3.23.0-py3-none-any.whl", hash = "sha256:071652d6115ed432f5ce1d34c336c0adfd6a884660d1e9712a256d3d3bd4b14e"}, + {file = "zipp-3.23.0.tar.gz", hash = "sha256:a07157588a12518c9d4034df3fbbee09c814741a33ff63c05fa29d26a2404166"}, +] + +[package.extras] +check = ["pytest-checkdocs (>=2.4)", "pytest-ruff (>=0.2.1) ; sys_platform != \"cygwin\""] +cover = ["pytest-cov"] +doc = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-lint"] +enabler = ["pytest-enabler (>=2.2)"] +test = ["big-O", "jaraco.functools", "jaraco.itertools", "jaraco.test", "more_itertools", "pytest (>=6,!=8.1.*)", "pytest-ignore-flaky"] +type = ["pytest-mypy"] + [[package]] name = "zstandard" version = "0.23.0" @@ -5167,4 +5422,4 @@ cffi = ["cffi (>=1.11)"] [metadata] lock-version = "2.1" python-versions = "3.11.*" -content-hash = "2fb88714c356812fa849c9cf108c30b128f02f41f9f3b8b673255349bed7500c" +content-hash = "5fff654ae60691b345f62281e63642f4d0ddcd634b7d6c30bc4bb145b4e4bf24" diff --git a/pyproject.toml b/pyproject.toml index ff11a583..1aa8bc19 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,6 +30,8 @@ pytest = "^8.4.1" aiohttp = ">=3.10,<3.11" sentry-sdk = "^2.35.0" psycopg2-binary = "^2.9.10" +langfuse = "^4.0.1" +opentelemetry-instrumentation-anthropic = "^0.53.3" [tool.poetry.group.ft] optional = true diff --git a/services/entry.py b/services/entry.py index ae8cc910..e9814fe5 100644 --- a/services/entry.py +++ b/services/entry.py @@ -9,10 +9,28 @@ load_dotenv() +# Langfuse: init after load_dotenv so env vars are available, before any Anthropic client is created +from opentelemetry.instrumentation.anthropic import AnthropicInstrumentor +AnthropicInstrumentor().instrument() + +from langfuse import Langfuse +from langfuse.span_filter import is_default_export_span + + +def _should_export_span(span): + """Drop spans marked as tracing-disabled (user has not opted in).""" + attrs = getattr(span, "attributes", None) or {} + if attrs.get("langfuse.trace.metadata.tracing_disabled") == "true": + return False + return is_default_export_span(span) + + +langfuse = Langfuse(should_export_span=_should_export_span) + env = os.getenv('ENVIRONMENT', 'unknown') trace_rates = { 'development': 1, - 'staging': 0.05, + 'staging': 0.05, 'production': 0.03, 'unknown': 0.0, } @@ -68,6 +86,8 @@ def call( sentry_sdk.capture_exception(e) result = ApolloError(code=500, message=str(e), type="INTERNAL_ERROR").to_dict() + langfuse.flush() + if output_path: with open(output_path, "w") as f: json.dump(result, f) diff --git a/services/global_chat/PAYLOAD_SPEC.md b/services/global_chat/PAYLOAD_SPEC.md index 7bdaa28d..54045e3a 100644 --- a/services/global_chat/PAYLOAD_SPEC.md +++ b/services/global_chat/PAYLOAD_SPEC.md @@ -15,8 +15,8 @@ This document defines the input and output payload structure for the Global Agen // workflows/my-workflow // workflows/my-workflow/settings - "metadata": { // Optional metadata - "user": {} // User info (reserved for future use) + "meta": { // Optional metadata + "session_id": "string" // Session ID for multi-turn grouping }, "history": [ // Chat history (optional) @@ -52,8 +52,8 @@ This document defines the input and output payload structure for the Global Agen - `workflows/` — user is viewing the workflow overview - `workflows//settings` — user is viewing workflow settings -- **`metadata`** (object, optional): Extensible metadata object. - - **`user`** (object, optional): User information. Reserved for future use. +- **`meta`** (object, optional): Extensible metadata object. + - **`session_id`** (string, optional): Session ID for grouping multi-turn conversations. - **`history`** (array, optional): Conversation history. Each turn has `role` and `content`. History is managed and returned by each agent internally. diff --git a/services/global_chat/README.md b/services/global_chat/README.md index 0a575a66..b0de787d 100644 --- a/services/global_chat/README.md +++ b/services/global_chat/README.md @@ -56,6 +56,10 @@ Request to build a new multi-step workflow from scratch: - `options` (optional): Runtime options object (e.g. `{stream: false}`) - `api_key` (optional): Anthropic API key; falls back to `ANTHROPIC_API_KEY` env var +- `user` (optional): User identity object with `id` (string) and `employee` + (boolean) fields — used for Langfuse trace attribution +- `metrics_opt_in` (optional): Set to `true` to enable Langfuse tracing for this + session. Currently force-enabled for all global_chat sessions. ### Example Output diff --git a/services/global_chat/global_chat.py b/services/global_chat/global_chat.py index 6cbffb9e..f623a273 100644 --- a/services/global_chat/global_chat.py +++ b/services/global_chat/global_chat.py @@ -12,7 +12,9 @@ from pathlib import Path sys.path.append(str(Path(__file__).parent.parent)) +from langfuse import observe, propagate_attributes, get_client as get_langfuse_client from util import ApolloError, create_logger +from langfuse_util import should_track, build_tags from global_chat.config_loader import ConfigLoader from global_chat.router import RouterAgent @@ -25,11 +27,13 @@ class Payload: content: str workflow_yaml: Optional[str] = None page: Optional[str] = None - metadata: Optional[Dict] = None + meta: Optional[Dict] = None history: Optional[List[Dict]] = None options: Optional[Dict] = None api_key: Optional[str] = None attachments: Optional[List[Dict]] = None + user: Optional[Dict] = None + metrics_opt_in: Optional[bool] = None @classmethod def from_dict(cls, data: Dict[str, Any]) -> "Payload": @@ -41,11 +45,13 @@ def from_dict(cls, data: Dict[str, Any]) -> "Payload": content=data["content"], workflow_yaml=data.get("workflow_yaml"), page=data.get("page"), - metadata=data.get("metadata"), + meta=data.get("meta"), history=data.get("history"), options=data.get("options"), api_key=data.get("api_key"), attachments=data.get("attachments"), + user=data.get("user"), + metrics_opt_in=data.get("metrics_opt_in"), ) def get_stream(self) -> bool: @@ -53,6 +59,7 @@ def get_stream(self) -> bool: return (self.options or {}).get("stream", False) +@observe(name="global_chat", capture_input=False) def main(data_dict: dict) -> dict: """ Main entry point for global agent service. @@ -68,30 +75,45 @@ def main(data_dict: dict) -> dict: data = Payload.from_dict(data_dict) logger.info(f"Global agent called with content: {data.content[:100]}...") - # 2. Load configuration - config_loader = ConfigLoader("config.yaml") - - # 3. Initialize router - router = RouterAgent(config_loader, data.api_key) - - # 4. Route and execute - result = router.route_and_execute( - content=data.content, - workflow_yaml=data.workflow_yaml, - page=data.page, - history=data.history or [], - stream=data.get_stream(), - attachments=data.attachments or [] - ) - - # 5. Return structured response - return { - "response": result.response, - "attachments": result.attachments, - "history": result.history, - "usage": result.usage, - "meta": result.meta - } + session_id = data.meta.get("session_id") if data.meta else None + user_info = data.user or {} + # TEMPORARY: force tracking for all global_chat sessions — remove when relying on metrics_opt_in + tracking = should_track(data_dict, force=True) + + if tracking: + langfuse = get_langfuse_client() + langfuse.update_current_span(input=data.content) + + with propagate_attributes( + session_id=session_id, + user_id=user_info.get("id") if tracking else None, + tags=build_tags("global_chat", user_info) if tracking else None, + metadata=None if tracking else {"tracing_disabled": "true"}, + ): + # 2. Load configuration + config_loader = ConfigLoader("config.yaml") + + # 3. Initialize router + router = RouterAgent(config_loader, data.api_key) + + # 4. Route and execute + result = router.route_and_execute( + content=data.content, + workflow_yaml=data.workflow_yaml, + page=data.page, + history=data.history or [], + stream=data.get_stream(), + attachments=data.attachments or [] + ) + + # 5. Return structured response + return { + "response": result.response, + "attachments": result.attachments, + "history": result.history, + "usage": result.usage, + "meta": result.meta + } except ApolloError as e: logger.error(f"ApolloError in global_chat: {e}") diff --git a/services/global_chat/planner.py b/services/global_chat/planner.py index f4a59e5f..adaff2c2 100644 --- a/services/global_chat/planner.py +++ b/services/global_chat/planner.py @@ -10,6 +10,7 @@ from pathlib import Path sys.path.append(str(Path(__file__).parent.parent)) +from langfuse import observe from util import create_logger, ApolloError, sum_usage from streaming_util import StreamManager from global_chat.config_loader import ConfigLoader @@ -58,6 +59,7 @@ def __init__(self, config_loader: ConfigLoader, api_key: Optional[str] = None): logger.info(f"PlannerAgent initialized with model: {self.model}") + @observe(name="planner") def run( self, content: str, diff --git a/services/global_chat/router.py b/services/global_chat/router.py index 38c9a3b3..0fbc1084 100644 --- a/services/global_chat/router.py +++ b/services/global_chat/router.py @@ -14,6 +14,7 @@ from pathlib import Path sys.path.append(str(Path(__file__).parent.parent)) +from langfuse import observe from util import create_logger, ApolloError, sum_usage from global_chat.config_loader import ConfigLoader from models import resolve_model @@ -66,6 +67,7 @@ def __init__(self, config_loader: ConfigLoader, api_key: Optional[str] = None): logger.info(f"RouterAgent initialized with model: {self.model}") + @observe(name="router") def route_and_execute( self, content: str, @@ -116,6 +118,7 @@ def route_and_execute( return result + @observe(name="routing_decision") def _make_routing_decision( self, content: str, diff --git a/services/global_chat/subagent_caller.py b/services/global_chat/subagent_caller.py index ee6a2d1d..80dc556c 100644 --- a/services/global_chat/subagent_caller.py +++ b/services/global_chat/subagent_caller.py @@ -10,12 +10,14 @@ # Import utilities from parent services directory sys.path.append(str(Path(__file__).parent.parent.parent)) +from langfuse import observe from util import create_logger, ApolloError from global_chat.yaml_utils import find_job_in_yaml logger = create_logger(__name__) +@observe(name="call_workflow_agent") def call_workflow_agent( tool_input: Dict, workflow_yaml: Optional[str] = None, @@ -62,6 +64,7 @@ def call_workflow_agent( raise ApolloError(500, f"workflow_agent failed: {str(e)}") +@observe(name="call_job_agent") def call_job_agent( tool_input: Dict, workflow_yaml: Optional[str] = None, diff --git a/services/global_chat/tests/test_langfuse_tracing.py b/services/global_chat/tests/test_langfuse_tracing.py new file mode 100644 index 00000000..bf6be7ed --- /dev/null +++ b/services/global_chat/tests/test_langfuse_tracing.py @@ -0,0 +1,151 @@ +""" +Langfuse tracing integration test for global_chat. + +Runs 2 sequential conversation turns with the same session_id: + Turn 1: vague request that triggers clarification (no workflow generated) + Turn 2: specific follow-up that triggers the planner to build a workflow + +After the turns, queries Langfuse API to verify traces were created with +correct user_id, tags, and that api_key is not leaked in trace input. +""" +import os +import time +import uuid +import yaml +from dotenv import load_dotenv +from langfuse.api import LangfuseAPI +from .test_utils import call_global_chat_service, print_response_details, get_attachment + +load_dotenv() + +SESSION_ID = f"test-global-chat-{uuid.uuid4().hex[:8]}" +USER_ID = f"test-user-{uuid.uuid4().hex[:8]}" + + +def _get_api_client(): + """Create a Langfuse REST API client from environment variables.""" + return LangfuseAPI( + base_url=os.environ["LANGFUSE_BASE_URL"], + username=os.environ["LANGFUSE_PUBLIC_KEY"], + password=os.environ["LANGFUSE_SECRET_KEY"], + ) + + +def _fetch_traces(session_id, retries=5, delay=3): + """Poll Langfuse API for traces matching session_id.""" + client = _get_api_client() + for i in range(retries): + result = client.trace.list(session_id=session_id, fields="core,io") + if result.data and len(result.data) > 0: + return result.data + if i < retries - 1: + time.sleep(delay) + return [] + + +def test_langfuse_multi_turn_global_chat(): + """2-turn conversation: vague request then clarification, with session_id.""" + print(f"\nLangfuse session_id: {SESSION_ID}") + + # --- Turn 1: vague request that should trigger clarification --- + content_1 = "I need help with my integration" + input_1 = { + "content": content_1, + "history": [], + "meta": {"session_id": SESSION_ID}, + "user": {"id": USER_ID, "employee": True}, + "metrics_opt_in": True, + } + + response_1 = call_global_chat_service(input_1) + print_response_details(response_1, test_name="turn_1", content=content_1) + assert "response" in response_1, f"Turn 1 failed: {response_1}" + history_1 = response_1.get("history", [ + {"role": "user", "content": content_1}, + {"role": "assistant", "content": response_1["response"]}, + ]) + + # --- Turn 2: specific follow-up that should trigger planner --- + content_2 = ( + "I want to create a workflow that fetches new patient registrations " + "from CommCare every hour and creates matching tracked entities in DHIS2." + ) + input_2 = { + "content": content_2, + "history": history_1, + "meta": {"session_id": SESSION_ID}, + "user": {"id": USER_ID, "employee": True}, + "metrics_opt_in": True, + } + + response_2 = call_global_chat_service(input_2) + print_response_details(response_2, test_name="turn_2", content=content_2) + assert "response" in response_2, f"Turn 2 failed: {response_2}" + + # The planner should have generated a workflow + yaml_str = get_attachment(response_2, "workflow_yaml") + if yaml_str: + parsed = yaml.safe_load(yaml_str) + print(f"\nGenerated workflow has {len(parsed.get('jobs', {}))} jobs") + + # --- Verify traces in Langfuse --- + traces = _fetch_traces(SESSION_ID) + assert len(traces) >= 1, f"Expected traces for session {SESSION_ID}, found none" + + for trace in traces: + # Verify user_id is set + assert trace.user_id == USER_ID, f"Expected user_id={USER_ID}, got {trace.user_id}" + # Verify tags include service name and employee + assert "global_chat" in trace.tags, f"Missing 'global_chat' tag: {trace.tags}" + assert "employee" in trace.tags, f"Missing 'employee' tag: {trace.tags}" + # Verify api_key is NOT in trace input (capture_input=False fix) + trace_input = str(trace.input or "") + assert "api_key" not in trace_input, f"api_key leaked in trace input: {trace_input[:200]}" + + print(f"\n All 2 turns completed and verified in Langfuse for session: {SESSION_ID}") + + +def test_langfuse_global_chat_force_tracking(): + """global_chat should trace even without metrics_opt_in (force=True).""" + force_session = f"test-global-chat-force-{uuid.uuid4().hex[:8]}" + + input_data = { + "content": "What adaptors are available?", + "history": [], + "meta": {"session_id": force_session}, + # No metrics_opt_in — should still trace because global_chat uses force=True + } + + response = call_global_chat_service(input_data) + assert "response" in response, f"Force-tracking test failed: {response}" + + traces = _fetch_traces(force_session) + assert len(traces) >= 1, ( + f"Expected traces for force-tracked session {force_session} even without " + f"metrics_opt_in, found none. global_chat should force-enable tracking." + ) + + print(f"\n Force-tracking verified for session: {force_session}") + + +def test_langfuse_global_chat_force_tracking_explicit_false(): + """global_chat should trace even with metrics_opt_in=False (force=True).""" + session = f"test-global-chat-force-false-{uuid.uuid4().hex[:8]}" + + input_data = { + "content": "What adaptors are available?", + "history": [], + "meta": {"session_id": session}, + "metrics_opt_in": False, + } + + response = call_global_chat_service(input_data) + assert "response" in response, f"Force-tracking (explicit false) test failed: {response}" + + traces = _fetch_traces(session) + assert len(traces) >= 1, ( + f"Expected traces for session {session} even with metrics_opt_in=False. " + f"global_chat should force-enable tracking regardless." + ) + + print(f"\n Force-tracking (explicit false) verified for session: {session}") diff --git a/services/job_chat/README.md b/services/job_chat/README.md index bb0d2cf1..4d84d9d9 100644 --- a/services/job_chat/README.md +++ b/services/job_chat/README.md @@ -90,12 +90,17 @@ The input payload is a JSON object with the following structure "stream": false, "download_adaptor_docs": true, "refresh_rag": false, - "api_key": "" + "api_key": "", + "user": { "id": "user-abc-123", "employee": true }, + "metrics_opt_in": true } ``` All context is optional, as is history. +- `user` (optional): User identity object with `id` (string) and `employee` (boolean) fields — used for Langfuse trace attribution +- `metrics_opt_in` (optional): Set to `true` to enable Langfuse tracing for this session + The `download_adaptor_docs` flag (defaults to `true`) controls whether adaptor docs are automatically loaded before building prompts. Set to `false` to skip auto-loading. ## Response Reference diff --git a/services/job_chat/job_chat.py b/services/job_chat/job_chat.py index 462e30e3..767253e0 100644 --- a/services/job_chat/job_chat.py +++ b/services/job_chat/job_chat.py @@ -15,6 +15,8 @@ InternalServerError, ) import sentry_sdk +from langfuse import observe, propagate_attributes, get_client as get_langfuse_client +from langfuse_util import should_track, build_tags from util import ApolloError, create_logger, AdaptorSpecifier, add_page_prefix from .prompt import build_prompt, build_error_correction_prompt from .old_prompt import build_old_prompt @@ -60,6 +62,8 @@ class Payload: stream: Optional[bool] = False download_adaptor_docs: Optional[bool] = True refresh_rag: Optional[bool] = False + user: Optional[dict] = None + metrics_opt_in: Optional[bool] = None @classmethod def from_dict(cls, data: Dict[str, Any]) -> "Payload": @@ -77,7 +81,9 @@ def from_dict(cls, data: Dict[str, Any]) -> "Payload": suggest_code=data.get("suggest_code"), stream=data.get("stream", False), download_adaptor_docs=data.get("download_adaptor_docs", True), - refresh_rag=data.get("refresh_rag", False) + refresh_rag=data.get("refresh_rag", False), + user=data.get("user"), + metrics_opt_in=data.get("metrics_opt_in"), ) @@ -105,6 +111,7 @@ def __init__(self, config: Optional[ChatConfig] = None): raise ValueError("API key must be provided") self.client = Anthropic(api_key=self.api_key) + @staticmethod def _unescape_json_string(text): """Unescape JSON string escape sequences (e.g. \\n -> newline, \\" -> quote). @@ -117,7 +124,8 @@ def _unescape_json_string(text): return json.loads(f'"{text}"') except (json.JSONDecodeError, ValueError): return text - + + @observe(name="job_chat_generate") def generate( self, content: str, @@ -437,6 +445,7 @@ def handle_replace_error(self, content: str, text_answer: str, code: str, edit: return (corrected_code, success, warning) + @observe(name="job_chat_error_correction") def try_error_correction(self, content: str, error_message: str, old_code: str, new_code: str, full_code: str, text_explanation: str) -> tuple[str, bool, Optional[str]]: """Try to correct the edit once, return (code, success).""" logger.info(f"Code edit error: {error_message}. Attempting correction...") @@ -497,6 +506,7 @@ def sum_usage(self, *usage_objects): +@observe(name="job_chat", capture_input=False) def main(data_dict: dict) -> dict: """ Main entry point with improved error handling and input validation. @@ -508,6 +518,15 @@ def main(data_dict: dict) -> dict: data = Payload.from_dict(data_dict) + input_meta = data_dict.get("meta") or {} + session_id = input_meta.get("session_id") if isinstance(input_meta, dict) else None + user_info = data.user or {} + tracking = should_track(data_dict) + + if tracking: + langfuse = get_langfuse_client() + langfuse.update_current_span(input=data.content) + if data.context is None: data.context = {} @@ -528,8 +547,7 @@ def main(data_dict: dict) -> dict: logger.warning(f"Failed to parse adaptor string '{adaptor_string}': {e}") # Extract rag_data from meta if present - input_meta = data_dict.get("meta", {}) - rag_data = input_meta.get("rag") + rag_data = input_meta.get("rag") if isinstance(input_meta, dict) else None # Detect navigation by comparing current page prefix with last turn's prefix current_prefix = add_page_prefix("", current_page).strip() @@ -539,31 +557,36 @@ def main(data_dict: dict) -> dict: config = ChatConfig(api_key=data.api_key) if data.api_key else None client = AnthropicClient(config) - result = client.generate( - content=data.content, - history=data_dict.get("history", []), - context=data.context, - rag=rag_data, - suggest_code=data.suggest_code, - stream=data.stream, - download_adaptor_docs=data.download_adaptor_docs, - refresh_rag=should_refresh_rag, - current_page=current_page - ) - + with propagate_attributes( + session_id=session_id, + user_id=user_info.get("id") if tracking else None, + tags=build_tags("job_chat", user_info) if tracking else None, + metadata=None if tracking else {"tracing_disabled": "true"}, + ): + result = client.generate( + content=data.content, + history=data_dict.get("history", []), + context=data.context, + rag=rag_data, + suggest_code=data.suggest_code, + stream=data.stream, + download_adaptor_docs=data.download_adaptor_docs, + refresh_rag=should_refresh_rag, + current_page=current_page + ) - response_dict = { - "response": result.response, - "suggested_code": result.suggested_code, - "history": result.history, - "usage": result.usage, - "meta": {"rag": result.rag} - } + response_dict = { + "response": result.response, + "suggested_code": result.suggested_code, + "history": result.history, + "usage": result.usage, + "meta": {"rag": result.rag} + } - if result.diff: - response_dict["diff"] = result.diff + if result.diff: + response_dict["diff"] = result.diff - return response_dict + return response_dict except ValueError as e: raise ApolloError(400, str(e), type="BAD_REQUEST") diff --git a/services/job_chat/prompt.py b/services/job_chat/prompt.py index 0cbd09e9..38f4391c 100644 --- a/services/job_chat/prompt.py +++ b/services/job_chat/prompt.py @@ -1,6 +1,7 @@ import json import time import sentry_sdk +from langfuse import observe from util import create_logger, ApolloError, AdaptorSpecifier, get_db_connection from .retrieve_docs import retrieve_knowledge from search_adaptor_docs.search_adaptor_docs import fetch_signatures @@ -393,6 +394,7 @@ def format_search_results(search_results): for result in search_results ]) +@observe(name="job_chat_build_prompt") def build_prompt(content, history, context, rag=None, api_key=None, stream_manager=None, download_adaptor_docs=True, refresh_rag=False): retrieved_knowledge = { "search_results": [], diff --git a/services/job_chat/retrieve_docs.py b/services/job_chat/retrieve_docs.py index 0c9f6658..8607a583 100644 --- a/services/job_chat/retrieve_docs.py +++ b/services/job_chat/retrieve_docs.py @@ -12,6 +12,7 @@ InternalServerError, ) import sentry_sdk +from langfuse import observe from util import ApolloError, create_logger from models import resolve_model from search_docsite.search_docsite import DocsiteSearch @@ -37,6 +38,7 @@ def get_client(api_key=None): return anthropic.Anthropic(api_key=key) +@observe(name="job_chat_retrieve_docs") def retrieve_knowledge(content, history, code="", adaptor="", api_key=None): """ Retrieve relevant documentation sections based on user's question. diff --git a/services/job_chat/tests/test_langfuse_tracing.py b/services/job_chat/tests/test_langfuse_tracing.py new file mode 100644 index 00000000..8e30b193 --- /dev/null +++ b/services/job_chat/tests/test_langfuse_tracing.py @@ -0,0 +1,201 @@ +""" +Langfuse tracing integration test for job_chat. + +Runs 4 sequential conversation turns with the same session_id to verify +that Langfuse traces are created and grouped into a session. +Also tests that opting out produces zero traces. +""" +import os +import time +import uuid +from dotenv import load_dotenv +from langfuse.api import LangfuseAPI +from .test_utils import call_job_chat_service, make_service_input, print_response_details + +load_dotenv() + + +SESSION_ID = f"test-job-chat-{uuid.uuid4().hex[:8]}" +USER_ID = f"test-user-{uuid.uuid4().hex[:8]}" + +CONTEXT = { + "expression": """// Fetch patient data from FHIR server +get('Patient', { count: 100 }); + +fn(state => { + const patients = state.data.entry.map(e => e.resource); + return { ...state, patients }; +});""", + "adaptor": "@openfn/language-http@6.5.1", +} + + +def _get_api_client(): + """Create a Langfuse REST API client from environment variables.""" + return LangfuseAPI( + base_url=os.environ["LANGFUSE_BASE_URL"], + username=os.environ["LANGFUSE_PUBLIC_KEY"], + password=os.environ["LANGFUSE_SECRET_KEY"], + ) + + +def _fetch_traces(session_id, retries=5, delay=3): + """Poll Langfuse API for traces matching session_id.""" + client = _get_api_client() + for i in range(retries): + result = client.trace.list(session_id=session_id, fields="core,io") + if result.data and len(result.data) > 0: + return result.data + if i < retries - 1: + time.sleep(delay) + return [] + + +def test_langfuse_multi_turn_job_chat(): + """4-turn conversation testing Langfuse trace grouping via session_id.""" + print(f"\nLangfuse session_id: {SESSION_ID}") + + # --- Turn 1: initial question --- + content_1 = "Can you add error handling so that if the GET request fails it logs the error and returns an empty array?" + input_1 = make_service_input( + history=[], + content=content_1, + context=CONTEXT, + suggest_code=True, + ) + input_1["meta"] = {"session_id": SESSION_ID} + input_1["user"] = {"id": USER_ID, "employee": True} + input_1["metrics_opt_in"] = True + + response_1 = call_job_chat_service(input_1) + print_response_details(response_1, test_name="turn_1", content=content_1) + assert "response" in response_1, f"Turn 1 failed: {response_1}" + history_1 = response_1["history"] + + # --- Turn 2: follow-up refinement --- + content_2 = "Good, but can you also add a retry with a 2 second delay before giving up?" + input_2 = make_service_input( + history=history_1, + content=content_2, + context={ + **CONTEXT, + "expression": response_1.get("suggested_code") or CONTEXT["expression"], + }, + meta={"rag": response_1.get("meta", {}).get("rag")}, + suggest_code=True, + ) + input_2["meta"]["session_id"] = SESSION_ID + input_2["user"] = {"id": USER_ID, "employee": True} + input_2["metrics_opt_in"] = True + + response_2 = call_job_chat_service(input_2) + print_response_details(response_2, test_name="turn_2", content=content_2) + assert "response" in response_2, f"Turn 2 failed: {response_2}" + history_2 = response_2["history"] + + # --- Turn 3: ask a question (no code suggestion) --- + content_3 = "What HTTP status codes would cause the retry to trigger?" + input_3 = make_service_input( + history=history_2, + content=content_3, + context={ + **CONTEXT, + "expression": response_2.get("suggested_code") or CONTEXT["expression"], + }, + meta={"rag": response_2.get("meta", {}).get("rag")}, + suggest_code=False, + ) + input_3["meta"]["session_id"] = SESSION_ID + input_3["user"] = {"id": USER_ID, "employee": True} + input_3["metrics_opt_in"] = True + + response_3 = call_job_chat_service(input_3) + print_response_details(response_3, test_name="turn_3", content=content_3) + assert "response" in response_3, f"Turn 3 failed: {response_3}" + history_3 = response_3["history"] + + # --- Turn 4: back to code changes --- + content_4 = "Ok, let's only retry on 429 and 503 status codes. Update the code." + input_4 = make_service_input( + history=history_3, + content=content_4, + context={ + **CONTEXT, + "expression": response_2.get("suggested_code") or CONTEXT["expression"], + }, + meta={"rag": response_3.get("meta", {}).get("rag") if "meta" in response_3 else None}, + suggest_code=True, + ) + input_4["meta"]["session_id"] = SESSION_ID + input_4["user"] = {"id": USER_ID, "employee": True} + input_4["metrics_opt_in"] = True + + response_4 = call_job_chat_service(input_4) + print_response_details(response_4, test_name="turn_4", content=content_4) + assert "response" in response_4, f"Turn 4 failed: {response_4}" + + # --- Verify traces in Langfuse --- + traces = _fetch_traces(SESSION_ID) + assert len(traces) >= 1, f"Expected traces for session {SESSION_ID}, found none" + + for trace in traces: + assert trace.user_id == USER_ID, f"Expected user_id={USER_ID}, got {trace.user_id}" + assert "job_chat" in trace.tags, f"Missing 'job_chat' tag: {trace.tags}" + assert "employee" in trace.tags, f"Missing 'employee' tag: {trace.tags}" + trace_input = str(trace.input or "") + assert "api_key" not in trace_input, f"api_key leaked in trace input: {trace_input[:200]}" + + print(f"\n All 4 turns completed and verified in Langfuse for session: {SESSION_ID}") + + +def test_langfuse_job_chat_opt_out(): + """job_chat with metrics_opt_in absent should produce zero traces.""" + opt_out_session = f"test-job-chat-optout-{uuid.uuid4().hex[:8]}" + + input_data = make_service_input( + history=[], + content="How do I use the HTTP adaptor?", + context=CONTEXT, + suggest_code=False, + ) + input_data["meta"] = {"session_id": opt_out_session} + # No user, no metrics_opt_in + + response = call_job_chat_service(input_data) + assert "response" in response, f"Opt-out test failed: {response}" + + # Wait briefly then confirm no traces were exported + time.sleep(5) + traces = _fetch_traces(opt_out_session, retries=1, delay=0) + assert len(traces) == 0, ( + f"Expected zero traces for opted-out session {opt_out_session}, " + f"found {len(traces)}. The should_export_span filter may not be working." + ) + + print(f"\n Opt-out verified (absent): zero traces for session: {opt_out_session}") + + +def test_langfuse_job_chat_opt_out_explicit_false(): + """job_chat with metrics_opt_in=False should produce zero traces.""" + session = f"test-job-chat-optout-false-{uuid.uuid4().hex[:8]}" + + input_data = make_service_input( + history=[], + content="How do I use the HTTP adaptor?", + context=CONTEXT, + suggest_code=False, + ) + input_data["meta"] = {"session_id": session} + input_data["metrics_opt_in"] = False + + response = call_job_chat_service(input_data) + assert "response" in response, f"Opt-out (explicit false) test failed: {response}" + + time.sleep(5) + traces = _fetch_traces(session, retries=1, delay=0) + assert len(traces) == 0, ( + f"Expected zero traces for session {session} with metrics_opt_in=False, " + f"found {len(traces)}." + ) + + print(f"\n Opt-out verified (explicit false): zero traces for session: {session}") diff --git a/services/langfuse_util.py b/services/langfuse_util.py new file mode 100644 index 00000000..4b11a179 --- /dev/null +++ b/services/langfuse_util.py @@ -0,0 +1,16 @@ +"""Langfuse tracking utilities for controlling trace export and tagging.""" + + +def should_track(data_dict: dict, force: bool = False) -> bool: + """Check if this session should be tracked in Langfuse.""" + if force: + return True + return bool(data_dict.get("metrics_opt_in")) + + +def build_tags(service_name: str, user_info: dict) -> list: + """Build Langfuse tags list from service name and user info.""" + tags = [service_name] + if user_info.get("employee"): + tags.append("employee") + return tags diff --git a/services/workflow_chat/README.md b/services/workflow_chat/README.md index 930bd810..15bfa645 100644 --- a/services/workflow_chat/README.md +++ b/services/workflow_chat/README.md @@ -42,6 +42,10 @@ Simple input: - `stream` (optional, default: `false`): Enable streaming response - `read_only` (optional, default: `false`): Enable read-only mode (IDs removed, no code preservation) - `api_key` (optional): Anthropic API key (falls back to environment variable) +- `user` (optional): User identity object with `id` (string) and `employee` + (boolean) fields — used for Langfuse trace attribution +- `metrics_opt_in` (optional): Set to `true` to enable Langfuse tracing for this + session Second conversation turn example: diff --git a/services/workflow_chat/tests/test_langfuse_tracing.py b/services/workflow_chat/tests/test_langfuse_tracing.py new file mode 100644 index 00000000..f32700d2 --- /dev/null +++ b/services/workflow_chat/tests/test_langfuse_tracing.py @@ -0,0 +1,178 @@ +""" +Langfuse tracing integration test for workflow_chat. + +Runs 4 sequential conversation turns with the same session_id to verify +that Langfuse traces are created and grouped into a session. +Also tests that opting out produces zero traces. +""" +import os +import time +import uuid +from dotenv import load_dotenv +from langfuse.api import LangfuseAPI +from .test_utils import call_workflow_chat_service, make_service_input, print_response_details + +load_dotenv() + + +SESSION_ID = f"test-workflow-chat-{uuid.uuid4().hex[:8]}" +USER_ID = f"test-user-{uuid.uuid4().hex[:8]}" + + +def _get_api_client(): + """Create a Langfuse REST API client from environment variables.""" + return LangfuseAPI( + base_url=os.environ["LANGFUSE_BASE_URL"], + username=os.environ["LANGFUSE_PUBLIC_KEY"], + password=os.environ["LANGFUSE_SECRET_KEY"], + ) + + +def _fetch_traces(session_id, retries=5, delay=3): + """Poll Langfuse API for traces matching session_id.""" + client = _get_api_client() + for i in range(retries): + result = client.trace.list(session_id=session_id, fields="core,io") + if result.data and len(result.data) > 0: + return result.data + if i < retries - 1: + time.sleep(delay) + return [] + + +def test_langfuse_multi_turn_workflow_chat(): + """4-turn conversation testing Langfuse trace grouping via session_id.""" + print(f"\nLangfuse session_id: {SESSION_ID}") + + # --- Turn 1: create a new workflow from scratch --- + content_1 = "Create a workflow that receives patient data from a webhook, validates the required fields exist, and then sends the data to a DHIS2 instance." + input_1 = make_service_input( + existing_yaml="", + history=[], + content=content_1, + ) + input_1["meta"] = {"session_id": SESSION_ID} + input_1["user"] = {"id": USER_ID, "employee": True} + input_1["metrics_opt_in"] = True + + response_1 = call_workflow_chat_service(input_1) + print_response_details(response_1, content=content_1) + assert "response" in response_1, f"Turn 1 failed: {response_1}" + assert "response_yaml" in response_1, f"Turn 1 missing YAML: {response_1}" + yaml_1 = response_1["response_yaml"] + history_1 = response_1["history"] + + # --- Turn 2: modify the workflow --- + content_2 = "Add a step between validation and DHIS2 that maps the patient fields to the DHIS2 tracked entity format." + input_2 = make_service_input( + existing_yaml=yaml_1, + history=history_1, + content=content_2, + ) + input_2["meta"] = {"session_id": SESSION_ID} + input_2["user"] = {"id": USER_ID, "employee": True} + input_2["metrics_opt_in"] = True + + response_2 = call_workflow_chat_service(input_2) + print_response_details(response_2, content=content_2) + assert "response" in response_2, f"Turn 2 failed: {response_2}" + assert "response_yaml" in response_2, f"Turn 2 missing YAML: {response_2}" + yaml_2 = response_2["response_yaml"] + history_2 = response_2["history"] + + # --- Turn 3: add error handling path --- + content_3 = "If validation fails, instead of stopping the workflow, send the failed record to a Google Sheet for manual review." + input_3 = make_service_input( + existing_yaml=yaml_2, + history=history_2, + content=content_3, + ) + input_3["meta"] = {"session_id": SESSION_ID} + input_3["user"] = {"id": USER_ID, "employee": True} + input_3["metrics_opt_in"] = True + + response_3 = call_workflow_chat_service(input_3) + print_response_details(response_3, content=content_3) + assert "response" in response_3, f"Turn 3 failed: {response_3}" + assert "response_yaml" in response_3, f"Turn 3 missing YAML: {response_3}" + yaml_3 = response_3["response_yaml"] + history_3 = response_3["history"] + + # --- Turn 4: ask about the workflow --- + content_4 = "Can you change the webhook trigger to a cron trigger that runs every 15 minutes?" + input_4 = make_service_input( + existing_yaml=yaml_3, + history=history_3, + content=content_4, + ) + input_4["meta"] = {"session_id": SESSION_ID} + input_4["user"] = {"id": USER_ID, "employee": True} + input_4["metrics_opt_in"] = True + + response_4 = call_workflow_chat_service(input_4) + print_response_details(response_4, content=content_4) + assert "response" in response_4, f"Turn 4 failed: {response_4}" + + # --- Verify traces in Langfuse --- + traces = _fetch_traces(SESSION_ID) + assert len(traces) >= 1, f"Expected traces for session {SESSION_ID}, found none" + + for trace in traces: + assert trace.user_id == USER_ID, f"Expected user_id={USER_ID}, got {trace.user_id}" + assert "workflow_chat" in trace.tags, f"Missing 'workflow_chat' tag: {trace.tags}" + assert "employee" in trace.tags, f"Missing 'employee' tag: {trace.tags}" + trace_input = str(trace.input or "") + assert "api_key" not in trace_input, f"api_key leaked in trace input: {trace_input[:200]}" + + print(f"\n All 4 turns completed and verified in Langfuse for session: {SESSION_ID}") + + +def test_langfuse_workflow_chat_opt_out(): + """workflow_chat with metrics_opt_in absent should produce zero traces.""" + opt_out_session = f"test-workflow-chat-optout-{uuid.uuid4().hex[:8]}" + + input_data = make_service_input( + existing_yaml="", + history=[], + content="Create a simple webhook to HTTP workflow", + ) + input_data["meta"] = {"session_id": opt_out_session} + # No user, no metrics_opt_in + + response = call_workflow_chat_service(input_data) + assert "response" in response, f"Opt-out test failed: {response}" + + # Wait briefly then confirm no traces were exported + time.sleep(5) + traces = _fetch_traces(opt_out_session, retries=1, delay=0) + assert len(traces) == 0, ( + f"Expected zero traces for opted-out session {opt_out_session}, " + f"found {len(traces)}. The should_export_span filter may not be working." + ) + + print(f"\n Opt-out verified (absent): zero traces for session: {opt_out_session}") + + +def test_langfuse_workflow_chat_opt_out_explicit_false(): + """workflow_chat with metrics_opt_in=False should produce zero traces.""" + session = f"test-workflow-chat-optout-false-{uuid.uuid4().hex[:8]}" + + input_data = make_service_input( + existing_yaml="", + history=[], + content="Create a simple webhook to HTTP workflow", + ) + input_data["meta"] = {"session_id": session} + input_data["metrics_opt_in"] = False + + response = call_workflow_chat_service(input_data) + assert "response" in response, f"Opt-out (explicit false) test failed: {response}" + + time.sleep(5) + traces = _fetch_traces(session, retries=1, delay=0) + assert len(traces) == 0, ( + f"Expected zero traces for session {session} with metrics_opt_in=False, " + f"found {len(traces)}." + ) + + print(f"\n Opt-out verified (explicit false): zero traces for session: {session}") diff --git a/services/workflow_chat/workflow_chat.py b/services/workflow_chat/workflow_chat.py index e1932a4c..95af209e 100644 --- a/services/workflow_chat/workflow_chat.py +++ b/services/workflow_chat/workflow_chat.py @@ -25,6 +25,8 @@ InternalServerError, ) import sentry_sdk +from langfuse import observe, propagate_attributes, get_client as get_langfuse_client +from langfuse_util import should_track, build_tags from util import ApolloError, create_logger, add_page_prefix from .gen_project_prompt import build_prompt from workflow_chat.available_adaptors import get_available_adaptors @@ -62,8 +64,11 @@ class Payload: history: Optional[List[Dict[str, str]]] = None context: Optional[dict] = None api_key: Optional[str] = None + meta: Optional[str] = None stream: Optional[bool] = False read_only: Optional[bool] = False + user: Optional[dict] = None + metrics_opt_in: Optional[bool] = None @classmethod def from_dict(cls, data: Dict[str, Any]) -> "Payload": @@ -78,8 +83,11 @@ def from_dict(cls, data: Dict[str, Any]) -> "Payload": history=data.get("history", []), context=data.get("context"), api_key=data.get("api_key"), + meta=data.get("meta"), stream=data.get("stream", False), - read_only=data.get("read_only", False) + read_only=data.get("read_only", False), + user=data.get("user"), + metrics_opt_in=data.get("metrics_opt_in"), ) @@ -117,7 +125,8 @@ def _unescape_json_string(text): return json.loads(f'"{text}"') except (json.JSONDecodeError, ValueError): return text - + + @observe(name="workflow_chat_generate") def generate( self, content: str = None, @@ -544,6 +553,7 @@ def process_stream_event(self, event, accumulated_response, text_started, sent_l return accumulated_response, text_started, sent_length +@observe(name="workflow_chat", capture_input=False) def main(data_dict: dict) -> dict: """ Main entry point with improved error handling and input validation. @@ -555,6 +565,15 @@ def main(data_dict: dict) -> dict: data = Payload.from_dict(data_dict) + input_meta = data_dict.get("meta") or {} + session_id = input_meta.get("session_id") if isinstance(input_meta, dict) else None + user_info = data.user or {} + tracking = should_track(data_dict) + + if tracking: + langfuse = get_langfuse_client() + langfuse.update_current_span(input=data.content) + if data.context is None: data.context = {} @@ -568,25 +587,31 @@ def main(data_dict: dict) -> dict: config = ChatConfig(api_key=data.api_key) if data.api_key else None client = AnthropicClient(config) - result = client.generate( - content=data.content, - existing_yaml=data.existing_yaml, - errors=data.errors, - history=data.history, - stream=data.stream, - current_page=current_page, - read_only=data.read_only - ) - - # Build response - response_dict = { - "response": result.content, - "response_yaml": result.content_yaml, - "history": result.history, - "usage": result.usage - } + with propagate_attributes( + session_id=session_id, + user_id=user_info.get("id") if tracking else None, + tags=build_tags("workflow_chat", user_info) if tracking else None, + metadata=None if tracking else {"tracing_disabled": "true"}, + ): + result = client.generate( + content=data.content, + existing_yaml=data.existing_yaml, + errors=data.errors, + history=data.history, + stream=data.stream, + current_page=current_page, + read_only=data.read_only + ) + + # Build response + response_dict = { + "response": result.content, + "response_yaml": result.content_yaml, + "history": result.history, + "usage": result.usage + } - return response_dict + return response_dict except ValueError as e: raise ApolloError(400, str(e), type="BAD_REQUEST")