-
Notifications
You must be signed in to change notification settings - Fork 617
UN-3266 [FEAT] Async Executor Backend for Prompt Studio #1849
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
2da4907
41eeef8
f66dfb2
95c6592
d8cc6cc
44a2b3f
2f4f2dc
d041201
0a0cfb1
a4e1fd7
ae77d6a
5c22956
3cc3213
d0532f8
6173df5
bbe6f58
a3dc912
98c8071
21157ac
0216b59
db81b9d
e1da202
d119797
fbadbf8
882296e
6d3bbbf
292460b
f35c0e6
9bcb458
0cbd10a
2b1ab1e
4122f08
1ceb352
d69304d
7c1266b
0b84d9e
5b0629d
98ee4b9
2dffcef
3b35fb2
1ab6031
15c3daf
7ae1a74
fbf9c29
ec2f762
d6a3c5e
5c23ab0
525024f
a8cbce1
549f17a
f9b86a9
5369e5a
b5205ff
9659661
67eef62
3f4cc7d
a563a35
9b422da
6a6e8e9
817fc1c
d9bc50f
b715f64
e9c23b2
f59755a
4bf9736
0531870
a2edb23
3f86131
45e61c4
6391c6c
0af0484
807e405
9bdb3f5
18eafe9
7a01a35
3e5ce31
e3ca0c6
db3d8c2
a62a9fd
b3a90af
4200ac1
1c58eb9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -258,8 +258,11 @@ def execute_workflow( | |
| result.status_api = DeploymentHelper.construct_status_endpoint( | ||
| api_endpoint=api.api_endpoint, execution_id=execution_id | ||
| ) | ||
| # Check if highlight data should be removed using configuration registry | ||
| # Ensure workflow identification keys are always in item metadata | ||
| organization = api.organization if api else None | ||
| org_id = str(organization.organization_id) if organization else "" | ||
| cls._enrich_result_with_workflow_metadata(result, organization_id=org_id) | ||
| # Check if highlight data should be removed using configuration registry | ||
| enable_highlight = False # Safe default if the key is unavailable (e.g., OSS) | ||
| from configuration.config_registry import ConfigurationRegistry | ||
|
|
||
|
|
@@ -273,8 +276,10 @@ def execute_workflow( | |
| if not enable_highlight: | ||
| result.remove_result_metadata_keys(["highlight_data"]) | ||
| result.remove_result_metadata_keys(["extracted_text"]) | ||
| if not include_metadata: | ||
| result.remove_result_metadata_keys() | ||
| if include_metadata or include_metrics: | ||
| cls._enrich_result_with_usage_metadata(result) | ||
| if not include_metadata and not include_metrics: | ||
| result.remove_inner_result_metadata() | ||
| if not include_metrics: | ||
| result.remove_result_metrics() | ||
| except Exception as error: | ||
|
|
@@ -293,6 +298,144 @@ def execute_workflow( | |
| ) | ||
| return APIExecutionResponseSerializer(result).data | ||
|
|
||
| @staticmethod | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hope there is no structure change of result here .. Can you please add the model/sample in descreption. or along the |
||
| def _enrich_item_inner_metadata( | ||
| item: dict, file_exec_id: str, usage_helper: Any | ||
| ) -> None: | ||
| """Inject per-model usage breakdown into item['result']['metadata'].""" | ||
| inner_result = item.get("result") | ||
| if not isinstance(inner_result, dict): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. NIT: improve/enhance class ExecutionResponse by adding a dto for result |
||
| return | ||
| metadata = inner_result.get("metadata") | ||
| if not isinstance(metadata, dict): | ||
| return | ||
| usage_by_model = usage_helper.get_usage_by_model(file_exec_id) | ||
| if usage_by_model: | ||
| metadata.update(usage_by_model) | ||
|
|
||
| @staticmethod | ||
| def _enrich_item_top_metadata( | ||
| item: dict, file_exec_id: str, usage_helper: Any | ||
| ) -> None: | ||
| """Inject aggregated usage totals into item['metadata']['usage'].""" | ||
| item_metadata = item.get("metadata") | ||
| if not isinstance(item_metadata, dict): | ||
| return | ||
| aggregated = usage_helper.get_aggregated_token_count(file_exec_id) | ||
| if aggregated: | ||
| aggregated["file_execution_id"] = file_exec_id | ||
| item_metadata["usage"] = aggregated | ||
|
|
||
| @staticmethod | ||
| def _enrich_result_with_usage_metadata(result: ExecutionResponse) -> None: | ||
| """Enrich each file result's metadata with usage data. | ||
|
|
||
| For each file_execution_id: | ||
| 1. Injects per-model cost arrays (extraction_llm, challenge_llm, | ||
| embedding) into item["result"]["metadata"]. | ||
| 2. Injects aggregated usage totals into item["metadata"]["usage"], | ||
| matching the legacy response format. | ||
| """ | ||
| if not isinstance(result.result, list): | ||
| return | ||
|
|
||
| from usage_v2.helper import UsageHelper | ||
|
|
||
| for item in result.result: | ||
| if not isinstance(item, dict): | ||
| continue | ||
| file_exec_id = item.get("file_execution_id") | ||
| if not file_exec_id: | ||
| continue | ||
| DeploymentHelper._enrich_item_inner_metadata(item, file_exec_id, UsageHelper) | ||
| DeploymentHelper._enrich_item_top_metadata(item, file_exec_id, UsageHelper) | ||
|
|
||
| @staticmethod | ||
| def _enrich_item_workflow_metadata( | ||
| item: dict, | ||
| file_exec_id: str, | ||
| fe_lookup: dict, | ||
| workflow_execution: Any, | ||
| organization_id: str, | ||
| tag_names: list[str], | ||
| ) -> None: | ||
| """Populate workflow identification keys into item['metadata'].""" | ||
| if not isinstance(item.get("metadata"), dict): | ||
| item["metadata"] = {} | ||
| metadata = item["metadata"] | ||
| fe = fe_lookup.get(str(file_exec_id)) | ||
| we = fe.workflow_execution if fe else workflow_execution | ||
| if fe: | ||
| metadata.setdefault("source_name", fe.file_name) | ||
| metadata.setdefault("source_hash", fe.file_hash or "") | ||
| metadata.setdefault("file_execution_id", str(fe.id)) | ||
| metadata.setdefault("total_elapsed_time", fe.execution_time) | ||
| if we: | ||
| metadata.setdefault("workflow_id", str(we.workflow_id)) | ||
| metadata.setdefault("execution_id", str(we.id)) | ||
| metadata.setdefault( | ||
| "workflow_start_time", | ||
| we.created_at.timestamp() if we.created_at else None, | ||
| ) | ||
| metadata.setdefault("organization_id", organization_id) | ||
| metadata.setdefault("tags", tag_names) | ||
|
|
||
| @staticmethod | ||
| def _enrich_result_with_workflow_metadata( | ||
| result: ExecutionResponse, | ||
| organization_id: str, | ||
| ) -> None: | ||
| """Ensure workflow identification keys are always present in item metadata. | ||
|
|
||
| Uses setdefault() — fills in MISSING keys only, never overwrites | ||
| values already present from the workers cache. | ||
| """ | ||
| if not isinstance(result.result, list): | ||
| return | ||
|
|
||
| from workflow_manager.file_execution.models import WorkflowFileExecution | ||
|
|
||
| # 1. Collect file_execution_ids | ||
| file_exec_ids = [ | ||
| item.get("file_execution_id") | ||
| for item in result.result | ||
| if isinstance(item, dict) and item.get("file_execution_id") | ||
| ] | ||
| if not file_exec_ids: | ||
| return | ||
|
|
||
| # 2. Batch query (single JOIN query for all file executions) | ||
| fe_lookup = { | ||
| str(fe.id): fe | ||
| for fe in WorkflowFileExecution.objects.filter( | ||
| id__in=file_exec_ids | ||
| ).select_related("workflow_execution") | ||
| } | ||
|
|
||
| # 3. Get execution-level data (tags) — one M2M query | ||
| workflow_execution = None | ||
| tag_names: list[str] = [] | ||
| if fe_lookup: | ||
| first_fe = next(iter(fe_lookup.values())) | ||
| workflow_execution = first_fe.workflow_execution | ||
| tag_names = list(workflow_execution.tags.values_list("name", flat=True)) | ||
|
|
||
| # 4. Enrich each item | ||
| for item in result.result: | ||
| if not isinstance(item, dict): | ||
| continue | ||
| file_exec_id = item.get("file_execution_id") | ||
| if not file_exec_id: | ||
| continue | ||
| DeploymentHelper._enrich_item_workflow_metadata( | ||
| item=item, | ||
| file_exec_id=file_exec_id, | ||
| fe_lookup=fe_lookup, | ||
| workflow_execution=workflow_execution, | ||
| organization_id=organization_id, | ||
| tag_names=tag_names, | ||
| ) | ||
|
|
||
| @staticmethod | ||
| def get_execution_status(execution_id: str) -> ExecutionResponse: | ||
| """Current status of api execution. | ||
|
|
@@ -308,6 +451,42 @@ def get_execution_status(execution_id: str) -> ExecutionResponse: | |
| ) | ||
| return execution_response | ||
|
|
||
| @staticmethod | ||
| def process_completed_execution( | ||
| response: ExecutionResponse, | ||
| deployment_execution_dto: Any, | ||
| include_metadata: bool, | ||
| include_metrics: bool, | ||
| ) -> None: | ||
| """Enrich and clean up the response for a completed execution.""" | ||
| from configuration.config_registry import ConfigurationRegistry | ||
|
|
||
| api_deployment = deployment_execution_dto.api | ||
| organization = api_deployment.organization if api_deployment else None | ||
| org_id = str(organization.organization_id) if organization else "" | ||
| DeploymentHelper._enrich_result_with_workflow_metadata( | ||
| response, organization_id=org_id | ||
| ) | ||
| enable_highlight = False | ||
| if ConfigurationRegistry.is_config_key_available( | ||
| "ENABLE_HIGHLIGHT_API_DEPLOYMENT" | ||
| ): | ||
| from configuration.models import Configuration | ||
|
|
||
| enable_highlight = Configuration.get_value_by_organization( | ||
| config_key="ENABLE_HIGHLIGHT_API_DEPLOYMENT", | ||
| organization=organization, | ||
| ) | ||
| if not enable_highlight: | ||
| response.remove_result_metadata_keys(["highlight_data"]) | ||
| response.remove_result_metadata_keys(["extracted_text"]) | ||
| if include_metadata or include_metrics: | ||
| DeploymentHelper._enrich_result_with_usage_metadata(response) | ||
| if not include_metadata and not include_metrics: | ||
| response.remove_inner_result_metadata() | ||
| if not include_metrics: | ||
| response.remove_result_metrics() | ||
|
|
||
| @staticmethod | ||
| def fetch_presigned_file(url: str) -> InMemoryUploadedFile: | ||
| """Fetch a file from a presigned URL and convert it to an uploaded file. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,105 @@ | ||
| """Lightweight Celery app for dispatching tasks to worker-v2 workers. | ||
|
|
||
| The Django backend already has a Celery app for internal tasks (beat, | ||
| periodic tasks, etc.) whose broker URL is set via CELERY_BROKER_URL. | ||
| Workers use the same broker. This module provides a second Celery app | ||
| instance that reuses the same broker URL (from Django settings) but | ||
| bypasses Celery's env-var-takes-priority behaviour so it can coexist | ||
| with the main Django Celery app in the same process. | ||
|
|
||
| Problem: Celery reads the ``CELERY_BROKER_URL`` environment variable | ||
| with highest priority — overriding constructor args, ``conf.update()``, | ||
| and ``config_from_object()``. | ||
|
|
||
| Solution: Subclass Celery and override ``connection_for_write`` / | ||
| ``connection_for_read`` so they always use our explicit broker URL, | ||
| bypassing the config resolution chain entirely. | ||
| """ | ||
|
|
||
| import logging | ||
| from urllib.parse import quote_plus | ||
|
|
||
| from celery import Celery | ||
| from django.conf import settings | ||
| from kombu import Queue | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
| _worker_app: Celery | None = None | ||
|
|
||
|
|
||
| class _WorkerDispatchCelery(Celery): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why celery here ? We already moved it from backend . What this methods do here?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @muhammad-ali-e The backend Celery worker handles fire-and-forget callback tasks that run after the executor worker finishes. Here's the flow: Backend dispatches task → Executor Worker (does the heavy lifting) Why these run on the backend (not the executor worker):
|
||
| """Celery subclass that forces an explicit broker URL. | ||
|
|
||
| Works around Celery's env-var-takes-priority behaviour where | ||
| ``CELERY_BROKER_URL`` always overrides per-app configuration. | ||
| The connection methods are the actual points where Celery opens | ||
| AMQP/Redis connections, so overriding them is both sufficient | ||
| and safe. | ||
| """ | ||
|
|
||
| _explicit_broker: str | None = None | ||
|
|
||
| def connection_for_write(self, url=None, *args, **kwargs): | ||
| return super().connection_for_write(url or self._explicit_broker, *args, **kwargs) | ||
|
|
||
| def connection_for_read(self, url=None, *args, **kwargs): | ||
| return super().connection_for_read(url or self._explicit_broker, *args, **kwargs) | ||
|
|
||
|
|
||
| def get_worker_celery_app() -> Celery: | ||
| """Get or create a Celery app for dispatching to worker-v2 workers. | ||
|
|
||
| The app uses: | ||
| - Same broker as the workers (built from CELERY_BROKER_BASE_URL, | ||
| CELERY_BROKER_USER, CELERY_BROKER_PASS via Django settings) | ||
| - Same PostgreSQL result backend as the Django Celery app | ||
|
|
||
| Returns: | ||
| Celery app configured for worker-v2 dispatch. | ||
| """ | ||
| global _worker_app | ||
| if _worker_app is not None: | ||
| return _worker_app | ||
|
Comment on lines
+61
to
+63
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
if _worker_app is not None:
return _worker_appUnder gunicorn with threaded workers (or any multi-threaded Django deployment), two threads can simultaneously see The idiomatic Python fix is to use a module-level lock: import threading
_worker_app: Celery | None = None
_worker_app_lock = threading.Lock()
def get_worker_celery_app() -> Celery:
global _worker_app
if _worker_app is not None:
return _worker_app
with _worker_app_lock:
if _worker_app is None: # re-check inside lock
...
_worker_app = app
return _worker_appPrompt To Fix With AIThis is a comment left during a code review.
Path: backend/backend/worker_celery.py
Line: 61-63
Comment:
**Unsynchronised singleton initialisation — race condition under concurrent requests**
`get_worker_celery_app()` uses the classic double-check-without-lock pattern:
```python
if _worker_app is not None:
return _worker_app
```
Under gunicorn with threaded workers (or any multi-threaded Django deployment), two threads can simultaneously see `_worker_app is None` and both proceed to create a new `_WorkerDispatchCelery` instance. The second assignment overwrites the first (last-writer-wins), so each thread may end up holding a reference to a *different* object than what ends up in the module global. This is benign in practice because both instances are configured identically, but it is wasteful and could cause subtle issues if Celery connection pools are per-instance.
The idiomatic Python fix is to use a module-level lock:
```python
import threading
_worker_app: Celery | None = None
_worker_app_lock = threading.Lock()
def get_worker_celery_app() -> Celery:
global _worker_app
if _worker_app is not None:
return _worker_app
with _worker_app_lock:
if _worker_app is None: # re-check inside lock
...
_worker_app = app
return _worker_app
```
How can I resolve this? If you propose a fix, please make it concise. |
||
|
|
||
| # Reuse the broker URL already built by Django settings (base.py) | ||
| # from CELERY_BROKER_BASE_URL + CELERY_BROKER_USER + CELERY_BROKER_PASS | ||
| broker_url = settings.CELERY_BROKER_URL | ||
|
|
||
| # Reuse the same PostgreSQL result backend as Django's Celery app | ||
| result_backend = ( | ||
| f"db+postgresql://{settings.DB_USER}:" | ||
| f"{quote_plus(settings.DB_PASSWORD)}" | ||
| f"@{settings.DB_HOST}:{settings.DB_PORT}/" | ||
| f"{settings.CELERY_BACKEND_DB_NAME}" | ||
|
Comment on lines
+70
to
+74
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Prompt To Fix With AIThis is a comment left during a code review.
Path: backend/backend/worker_celery.py
Line: 70-74
Comment:
**`settings.DB_USER` is not URL-encoded in the result backend URL**
`quote_plus` is correctly applied to `DB_PASSWORD`, but `DB_USER` is interpolated raw. If the database username contains any URL-special characters (e.g. `@`, `:`, `/`), the resulting connection string would be malformed and the Celery result backend would fail to connect. Apply the same `quote_plus` encoding to `settings.DB_USER` for consistency and correctness, just as is done for `settings.DB_PASSWORD`.
How can I resolve this? If you propose a fix, please make it concise. |
||
| ) | ||
|
|
||
| app = _WorkerDispatchCelery( | ||
| "worker-dispatch", | ||
| set_as_current=False, | ||
| fixups=[], | ||
| ) | ||
| # Store the explicit broker URL for use in connection overrides | ||
| app._explicit_broker = broker_url | ||
|
|
||
| app.conf.update( | ||
| result_backend=result_backend, | ||
| task_queues=[Queue("executor")], | ||
| task_serializer="json", | ||
| accept_content=["json"], | ||
| result_serializer="json", | ||
| result_extended=True, | ||
| ) | ||
|
Comment on lines
+85
to
+92
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Configured queue name
The queue declared on the app ( Either align the queue name to Prompt To Fix With AIThis is a comment left during a code review.
Path: backend/backend/worker_celery.py
Line: 85-92
Comment:
**Configured queue name `"executor"` doesn't match the actual dispatch queue**
`get_worker_celery_app()` registers `task_queues=[Queue("executor")]`, but `ExecutionDispatcher._get_queue()` (in `sdk1/execution/dispatcher.py`) constructs the actual queue name as `celery_executor_{executor_name}` — for the legacy executor this becomes `"celery_executor_legacy"`.
The queue declared on the app (`"executor"`) never matches the queue used by `send_task`, so this `task_queues` setting has no practical effect. While `send_task` with an explicit `queue` parameter bypasses queue routing and the task is delivered correctly, the misconfigured `task_queues` setting means any queue-routing policies (e.g. prefetch limits, fair scheduling) configured on `"executor"` will not apply.
Either align the queue name to `"celery_executor_legacy"` (or the appropriate prefix), or remove the stale `task_queues` declaration from this app's config if it is intentionally unused.
How can I resolve this? If you propose a fix, please make it concise. |
||
|
|
||
| _worker_app = app | ||
| # Log broker host only (mask credentials) | ||
| safe_broker = broker_url.split("@")[-1] if "@" in broker_url else broker_url | ||
| safe_backend = ( | ||
| result_backend.split("@")[-1] if "@" in result_backend else result_backend | ||
| ) | ||
| logger.info( | ||
| "Created worker dispatch Celery app (broker=%s, result_backend=%s)", | ||
| safe_broker, | ||
| safe_backend, | ||
| ) | ||
| return _worker_app | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don’t think this should be allowed when the organization is missing. Also how it works with an empty org_id?
cc: @vishnuszipstack