diff --git a/.envs/.ci/.django b/.envs/.ci/.django index 6577ebece..b8869b2cb 100644 --- a/.envs/.ci/.django +++ b/.envs/.ci/.django @@ -30,3 +30,8 @@ RABBITMQ_DEFAULT_PASS=rabbitpass # NATS # ------------------------------------------------------------------------------ NATS_URL=nats://nats:4222 + +# Enable idempotent bootstrap for CI so processing services can self-register. +DJANGO_SUPERUSER_EMAIL=antenna@insectai.org +DJANGO_SUPERUSER_PASSWORD=localadmin +ENSURE_DEFAULT_PROJECT=1 diff --git a/.envs/.local/.django b/.envs/.local/.django index 5f65ac07c..004a02149 100644 --- a/.envs/.local/.django +++ b/.envs/.local/.django @@ -61,3 +61,7 @@ MINIO_BROWSER_REDIRECT_URL=http://minio:9001 DEFAULT_PROCESSING_SERVICE_NAME=Local Processing Service DEFAULT_PROCESSING_SERVICE_ENDPOINT=http://ml_backend:2000 # DEFAULT_PIPELINES_ENABLED=random,constant # When set to None, all pipelines will be enabled. + +# Idempotent local/CI bootstrap (ami/main/management/commands/ensure_default_project.py) +# Ensures a default superuser + project exist so processing services can self-register. +ENSURE_DEFAULT_PROJECT=1 diff --git a/ami/main/management/commands/ensure_default_project.py b/ami/main/management/commands/ensure_default_project.py new file mode 100644 index 000000000..f5e2f0bba --- /dev/null +++ b/ami/main/management/commands/ensure_default_project.py @@ -0,0 +1,77 @@ +""" +Idempotent bootstrap command for local dev and CI. + +Ensures a superuser and a named Project exist so that local-dev / CI processing +services can register pipelines against Antenna without any manual setup step. + +Guarded behind the ENSURE_DEFAULT_PROJECT env var so production deployments +never run it accidentally. Intended to be called from compose/local/django/start. + +Looks up / creates the Project by name (no slug field on Project) so running +this in a long-lived dev DB where PK 1 is already taken by a different project +doesn't conflict. +""" + +import logging +import os + +from django.contrib.auth import get_user_model +from django.core.management import call_command +from django.core.management.base import BaseCommand +from django.db import transaction + +from ami.main.models import Project + +logger = logging.getLogger(__name__) + +DEFAULT_PROJECT_NAME = "Default Project" + + +class Command(BaseCommand): + help = "Idempotently create a default superuser and project for local dev / CI." + + def add_arguments(self, parser): + parser.add_argument( + "--project-name", + default=os.environ.get("ANTENNA_DEFAULT_PROJECT_NAME", DEFAULT_PROJECT_NAME), + help="Project name to ensure exists (default: env ANTENNA_DEFAULT_PROJECT_NAME or 'Default Project')", + ) + + def handle(self, *args, **options): + User = get_user_model() + + try: + call_command("createsuperuser", interactive=False) + self.stdout.write(self.style.SUCCESS("Created superuser from DJANGO_SUPERUSER_* env vars")) + except Exception as e: + # createsuperuser raises CommandError if the user already exists; + # that's the idempotent path we want. + logger.info("Superuser createsuperuser call reported: %s", e) + + email = os.environ.get("DJANGO_SUPERUSER_EMAIL") + owner = User.objects.filter(email=email).first() if email else None + if owner is None: + self.stdout.write( + self.style.WARNING( + "No DJANGO_SUPERUSER_EMAIL env var (or user not found). " + "Project will be created without an owner." + ) + ) + + project_name = options["project_name"] + with transaction.atomic(): + project, created = Project.objects.get_or_create( + name=project_name, + defaults={"owner": owner, "description": "Bootstrap project for local dev and CI."}, + ) + + if created: + self.stdout.write(self.style.SUCCESS(f"Created project '{project_name}' (id={project.pk})")) + else: + self.stdout.write(f"Project '{project_name}' already exists (id={project.pk})") + + # Print in a stable, parseable format so shell wrappers can capture the + # ID. Compose files can't read command output — they use env vars — so + # the PS container reads ANTENNA_DEFAULT_PROJECT_NAME and resolves + # to a PK via the REST API rather than relying on PK being stable. + self.stdout.write(f"ANTENNA_DEFAULT_PROJECT_ID={project.pk}") diff --git a/compose/local/django/start b/compose/local/django/start index 5a2607cbd..edfe23269 100755 --- a/compose/local/django/start +++ b/compose/local/django/start @@ -6,6 +6,14 @@ set -o nounset python manage.py migrate +# Idempotent bootstrap for local dev and CI. Creates the default superuser +# (from DJANGO_SUPERUSER_* env vars) and a named project so processing-service +# containers can self-register against Antenna with no manual setup. +# Safe to run in production: gated behind ENSURE_DEFAULT_PROJECT=1. +if [ "${ENSURE_DEFAULT_PROJECT:-0}" = "1" ]; then + python manage.py ensure_default_project || echo "ensure_default_project failed, continuing" +fi + # Set USE_UVICORN=1 to use the original raw uvicorn dev server instead of gunicorn if [ "${USE_UVICORN:-0}" = "1" ]; then if [ "${DEBUGGER:-0}" = "1" ]; then diff --git a/docs/claude/planning/2026-04-17-minimal-worker-design.md b/docs/claude/planning/2026-04-17-minimal-worker-design.md new file mode 100644 index 000000000..b45592d16 --- /dev/null +++ b/docs/claude/planning/2026-04-17-minimal-worker-design.md @@ -0,0 +1,281 @@ +# Minimal worker (PSv2 pull-mode stub) — design + +**Date:** 2026-04-17 +**Status:** draft, initial stub +**Owner:** @mihow +**Related PRs:** +- RolnickLab/antenna#987 — job queue HTTP API (merged) +- RolnickLab/antenna#1194 — API key auth + `register.py` (open) +- RolnickLab/ami-data-companion#94 — ADC worker (merged) +- RolnickLab/ami-data-companion#136 — ADC API-key switch (open) + +## Intention + +Antenna supports two processing-service paradigms: + +1. **Push / "v1" / interactive** — a FastAPI service exposes `/info`, `/livez`, `/readyz`, `/process`. Antenna POSTs `PipelineRequest` to `/process` when a job runs. Good for single-image inference-demo UIs, `/api/v2/docs/` schema exposure, and admin smoke tests. This is the only mode the existing `processing_services/minimal/` and `processing_services/example/` containers support today. + +2. **Pull / "v2" / async / worker / consumer** — a long-running worker polls Antenna for work via the HTTP job-queue API (`POST /api/v2/jobs/{id}/tasks/`, `POST /api/v2/jobs/{id}/result/`) which Antenna proxies to a NATS JetStream queue. Workers can live behind firewalls, scale horizontally, run on GPUs without exposing any port. The only implementation today is the external [AMI Data Companion](https://github.com/RolnickLab/ami-data-companion) (ADC), which is heavyweight — conda environment, torch, real model weights, minutes of CUDA warmup. + +**The gap.** E2E testing and dev iteration on the Redis/RabbitMQ/NATS/Celery/Celery-Beat lifecycle is bottlenecked by having to spin up the ADC to exercise the pull path. There is no stub for v2 analogous to what `minimal/` is for v1. + +**The goal.** Add a minimal v2 worker stub that: + +- Mimics the ADC's HTTP-only interaction with Antenna so agents and CI exercise the real API contract. +- Uses deterministic stub pipelines (no torch, no model weights, no GPU) so a fresh `docker compose up` reaches a job-processing state in seconds, not minutes. +- Runs headless with zero manual setup, correctly sequenced so GitHub CI can use it the same way a developer uses it locally. +- Leaves the v1 push path intact for interactive and schema-exposure use cases. + +## Current state + +### What exists + +- `processing_services/minimal/` — v1 FastAPI stub. Two stub pipelines (`ConstantPipeline`, `RandomDetectionRandomSpeciesPipeline`). Used as the `ml_backend` service in `docker-compose.ci.yml` and alongside the main dev compose. +- `processing_services/example/` — v1 FastAPI with real zero-shot models. Intended as a template for real services. +- `ami/ml/schemas.py` — authoritative Pydantic schemas. v1 and v2 share `PipelineRequest` / `PipelineResultsResponse`. v2-specific: `PipelineProcessingTask`, `PipelineTaskResult`, `PipelineResultsError`, `ProcessingServiceClientInfo`. +- `ami/jobs/views.py::JobViewSet.tasks` (`POST /api/v2/jobs/{id}/tasks/`) — reserves a batch of tasks from NATS. +- `ami/jobs/views.py::JobViewSet.result` (`POST /api/v2/jobs/{id}/result/`) — accepts results, queues `process_nats_pipeline_result` Celery tasks. +- `ami/ml/views.py::ProjectPipelineViewSet.create` (`POST /api/v2/projects/{id}/pipelines/`) — registers pipelines for a pull-mode service. On main this expects `{"pipelines": [...], "processing_service_name": "..."}`. PR #1194 changes the auth and drops `processing_service_name` in favour of identification by API key. +- The ADC (external repo) is the only worker that exercises the pull path today. + +### What is in flight + +- **PR #1194** (Antenna): API-key auth for processing services, drops `processing_service_name` from pipeline registration, adds `client_info` to registration and tasks/result bodies, adds `processing_services/minimal/register.py` + `start.sh` for self-provisioning. Unmerged. +- **ADC PR #141**: Mothbot YOLO11m detector. +- **ADC PR #136**: switch ADC from user-token auth to API-key auth (companion to #1194). + +This design **targets main** (unmerged PRs are not prerequisites). When #1194 lands, the worker stub is a small diff away from API-key auth — see "Forward compatibility" below. + +## Desired state after this PR + +- `docker compose up` (or `docker compose -f docker-compose.ci.yml up`) reaches a state where an `async_api` job submitted against the `constant` or `random-detection-random-species` pipeline is picked up and processed to completion **with zero manual steps**. +- The existing v1 push behavior of `minimal/` continues to work unchanged for CI tests that hit `/process`. +- Agents have a documented, code-first reference for the v2 API contract they can extend when adding new features. +- The pull path exercises Redis state updates, NATS ACK via `reply_subject`, Celery `process_nats_pipeline_result`, `pipeline.save_results`, and the stale-job cutoff — the same code paths the ADC exercises in production. + +Out of scope: +- Real ML inference in the stub (stays random/constant). +- Renaming/rebuilding `example/` into `global_moths`/`complete` (follow-up). +- API-key-only auth (forward-compat, see below). +- Competing-consumer smoke tests, multi-worker tests, direct-NATS workers. + +## Design + +### Container architecture + +One image (`processing_services/minimal/`) runs in one of three modes, selected by the `MODE` env var: + +``` +MODE=api # FastAPI only (CI default; unchanged behavior) +MODE=worker # poll loop only +MODE=api+worker # FastAPI + register.py + poll loop (local dev default) +``` + +`start.sh` is the orchestrator. In `api+worker` it runs the FastAPI process in the background, runs `register.py` once (self-provisions a ProcessingService + registers pipelines), then starts the worker loop in the foreground. Signals propagate; if any child dies the container exits so the compose supervisor restarts it. + +### Directory layout + +``` +processing_services/minimal/ +├── Dockerfile # base image + start.sh as CMD +├── start.sh # MODE-dispatching orchestrator +├── main.py # unchanged FastAPI entry +├── register.py # self-provision + register pipelines (token-auth path now) +├── requirements.txt # unchanged (requests + pydantic already present) +├── .env.dev # dev-env defaults loaded by docker-compose.yml (MODE, creds, worker tuning) +├── api/ # v1 push-mode code + shared schemas +│ ├── api.py +│ ├── algorithms.py +│ ├── pipelines.py +│ ├── schemas.py # v1 schemas + v1/v2 shared schemas (PipelineResultsResponse, PipelineConfigResponse, ...) +│ └── utils.py +├── worker/ +│ ├── __init__.py +│ ├── client.py # requests.Session wrapper for Antenna REST +│ ├── loop.py # poll / reserve / process / submit loop +│ ├── runner.py # turn one PipelineProcessingTask into a PipelineTaskResult +│ └── schemas.py # v2-only schemas (PipelineProcessingTask, PipelineTaskResult, ProcessingServiceClientInfo, AsyncPipelineRegistrationRequest, ...) +└── worker_main.py # entry used by MODE=worker and the third child in MODE=api+worker +``` + +Schemas are split so the v1 FastAPI side doesn't have to know anything about pull-mode types. Shared classes that are wire-format on both sides (`PipelineResultsResponse`, `PipelineConfigResponse`, `SourceImageResponse`, etc.) stay in `api/schemas.py`; pull-mode-only types (`PipelineProcessingTask`, `PipelineTaskResult`, `PipelineResultsError`, `ProcessingServiceClientInfo`, `AsyncPipelineRegistrationRequest`) live in `worker/schemas.py`, which imports the shared ones from `api.schemas`. + +`worker/runner.py` imports from `api/pipelines.py` and `api/schemas.py` so stub detection/classification behavior is identical between v1 `/process` and v2 pull. No duplicated pipeline logic. + +### Wire format + +The worker speaks to Antenna over HTTP only. Direct NATS access from a worker is not supported (Antenna proxies NATS behind the HTTP endpoints). This matches the ADC contract and lets workers run behind firewalls. + +**Fetch active jobs:** +- `GET /api/v2/jobs/?pipeline=&status=STARTED&ids_only=true` for each of the stub's pipeline slugs. + +**Reserve tasks:** +- `POST /api/v2/jobs/{id}/tasks/` with `{"batch_size": 4}` → `{"tasks": [PipelineProcessingTask, ...]}` + +**Submit results:** +- `POST /api/v2/jobs/{id}/result/` with `{"results": [{"reply_subject": "...", "result": PipelineResultsResponse | PipelineResultsError}, ...]}` + +All 3 endpoints use `Authorization: Token ` for now. See "Forward compatibility" for API-key migration. + +### Poll loop (pseudocode) + +```python +# Mode B: all-pipelines poll (this PR). +# TODO(follow-up): add `--pipeline ` flag (mode A) for slug-filtered poll, +# enables multiple workers as competing consumers for the same pipeline. +# TODO(follow-up): add `--job-id ` flag (mode C) for one-shot job-pinned runs, +# for test harnesses that want to drain and exit. +my_slugs = list(pipeline_choices.keys()) +while not shutdown: + did_work = False + for slug in my_slugs: + for job_id in client.list_active_jobs(slug): + tasks = client.reserve_tasks(job_id, batch_size=WORKER_BATCH_SIZE) + if not tasks: + continue + did_work = True + results = [runner.process(task, slug) for task in tasks] # exceptions → PipelineResultsError + client.submit_results(job_id, results) + if not did_work: + sleep(WORKER_POLL_INTERVAL_SECONDS) +``` + +Per-task errors are captured and posted as `PipelineResultsError` so the NATS ACK path still fires. That's important for exercising the retry / stale-job-cutoff / MaxRetriesExceeded paths (see CLAUDE.md on chaos testing). + +The outer iteration is per-slug rather than "list all jobs for all slugs at once" specifically to avoid a job→slug reverse-lookup: the slug is the outer loop variable, so `runner.process(task, slug)` gets it for free. Mirrors how the ADC worker (see comparison below) is typically run — pinned to a single slug per process. + +### Automation and sequencing + +`docker compose up` → worker loop polling actively in ~10s with no manual steps, despite the steps involved: + +1. Postgres, RabbitMQ, Redis, NATS come up. +2. Django applies migrations. +3. **An idempotent `ensure_default_project` management command runs from the Django `/start` script when `ENSURE_DEFAULT_PROJECT=1` is set in the env.** It ensures: + - The default user exists (`antenna@insectai.org`, password `localadmin`). Matches `.envs/.local/.django` and the values already baked into PR #1194's `register.py` defaults. + - A project exists with a known slug (`default-project`). If one already exists with that slug, no-op. If not, create it with the default user as owner. The project's PK is looked up and exposed as the `ANTENNA_PROJECT_ID` env the minimal worker container reads. +4. The minimal container starts. `start.sh` launches FastAPI in background. +5. `register.py` waits for `http://localhost:2000/livez` (own server ready), then for `ANTENNA_API_URL/livez` (Antenna ready), then logs in, creates/finds a ProcessingService, registers pipelines. Has existing retry loop (10 attempts, 5s apart) for "project not found yet" etc. +6. `worker_main.py` starts polling. + +The **alternative** I considered and rejected was handling (3) in the processing service itself (e.g. `register.py` creates the project if missing). Rejected because: +- A "worker" component shouldn't have project-creation privileges in any realistic deployment. +- Django `/start` is the natural place for bootstrap data — migrations already run there. +- Makes the same seed logic available to any other local/CI flow that doesn't involve a processing service. + +The management command is gated by `ENSURE_DEFAULT_PROJECT=1` so it's opt-in. Set in `.envs/.local/.django` and `.envs/.ci/.django`, unset in `.envs/.production/*`. + +### Env var contract + +All env vars are read with `os.environ[...]` (no hard-coded fallbacks in Python code). The defaults below live in `processing_services/minimal/.env.dev`, which is loaded via `env_file:` in `processing_services/docker-compose.yml`. For a different deployment, copy `.env.dev` and point `env_file` at the copy, or set env vars in the container orchestrator of choice. + +| Var | `.env.dev` value | Used by | Purpose | +|---|---|---|---| +| `MODE` | `api+worker` (dev) / `api` (Dockerfile default, so CI gets it) | `start.sh` | container entry mode | +| `ANTENNA_API_URL` | `http://django:8000` | register.py, worker | Antenna base URL | +| `ANTENNA_PROJECT_ID` | unset (resolved via name lookup) | register.py, worker | project to register + poll under | +| `ANTENNA_DEFAULT_PROJECT_NAME` | `Default Project` | register.py | project name to resolve when PROJECT_ID not set | +| `ANTENNA_USER` | `antenna@insectai.org` | register.py, worker | dev superuser (matches `.envs/.local/.django`) | +| `ANTENNA_PASSWORD` | `localadmin` | register.py, worker | dev superuser password | +| `ANTENNA_API_AUTH_TOKEN` | unset | register.py, worker | if set, skip login and use this token directly | +| `ANTENNA_API_KEY` | unset | register.py, worker | TODO (PR #1194): if set, use `Api-Key ` auth | +| `ANTENNA_SERVICE_NAME` | `minimal-worker-dev` | register.py | ProcessingService DB record label | +| `WORKER_POLL_INTERVAL_SECONDS` | `2.0` | worker | sleep when no active jobs | +| `WORKER_BATCH_SIZE` | `4` | worker | tasks per reserve call (matches ADC default) | +| `WORKER_REQUEST_TIMEOUT_SECONDS` | `30` | worker | per-HTTP-call timeout | +| `ENSURE_DEFAULT_PROJECT` | `1` in local/CI, unset elsewhere | Django `/start` | run idempotent seed command | + +### Docker compose changes + +**`processing_services/docker-compose.yml`** (local dev overlay): +- `ml_backend_minimal` gains `env_file: ./minimal/.env.dev` — a checked-in file that sets all the MODE/Antenna/worker env vars in one place. No inline `environment:` duplicates the defaults. +- `depends_on: django` (condition: `service_started`; the worker's register.py retries). + +**`docker-compose.ci.yml`:** +- `ml_backend` keeps `MODE=api` (CI default, unchanged). Existing `/process`-based tests keep working. +- *(Deferred)* Optionally add `ml_backend_worker` as a second service entry under a compose profile for a future CI async test. Keeping CI behavior unchanged in this PR. + +**Main `docker-compose.yml`** (Antenna stack): +- `django` service gets `ENSURE_DEFAULT_PROJECT=1` in its env (via `.envs/.local/.django`). + +### Forward compatibility with PR #1194 (API-key auth) + +`register.py` is written so the swap is small: + +- It checks `ANTENNA_API_KEY` first → uses `Api-Key` auth. +- Otherwise uses `ANTENNA_API_AUTH_TOKEN` → uses `Token` auth. +- Otherwise logs in with `ANTENNA_USER` / `ANTENNA_PASSWORD` to get a token. + +PR #1194 changes the request body (drops `processing_service_name`, adds `client_info`). We send both `processing_service_name` and a `client_info` dict now — the former is required by main, the latter is ignored by main (extra fields are allowed). When #1194 lands, a one-line change removes `processing_service_name` from the registration payload. + +The worker's tasks/result endpoints already support token auth today and will support Api-Key after #1194; same conditional header-selection logic. + +## Comparison with prior / external implementations + +### Against the ADC worker (external: `RolnickLab/ami-data-companion`) + +The ADC is the only production PSv2 worker today. This stub aims to exercise the same Antenna code paths without the ADC's runtime cost. + +**Mirrors:** +- HTTP-only interaction with Antenna. No direct NATS or RabbitMQ connection from the worker; Antenna proxies NATS JetStream behind `/jobs/{id}/tasks/` and `/jobs/{id}/result/`. +- Registration shape: `POST /api/v2/projects/{id}/pipelines/` with the same `AsyncPipelineRegistrationRequest` body. +- Task/result wire format: `PipelineProcessingTask` in, `PipelineTaskResult` out. Errors → `PipelineResultsError` so the `reply_subject` ACK still fires. +- Retry/backoff pattern: `requests.Session` + `urllib3.util.retry.Retry` on 5xx + network errors; no retry on 4xx. +- Client info forwarding: `ProcessingServiceClientInfo` (hostname, software, version) sent on each request. + +**Divergences:** +- **Pipeline scope per process.** ADC invocations pin to one pipeline via `ami worker --pipeline `. The stub iterates over all slugs it serves (mode B), because a test harness wants to submit jobs for any of the stub's pipelines and have them picked up without spawning multiple workers. TODOs in `loop.py` leave room for mode A (slug-filtered) and mode C (job-pinned) flags. +- **Model loading.** ADC loads real torch weights into GPU memory on startup; can take minutes. The stub loads no models — `ConstantPipeline` returns a fixed bounding box, `RandomDetectionRandomSpeciesPipeline` returns random ones. `docker compose up` → processing in seconds. +- **Runtime environment.** ADC runs under a conda env with torch/transformers/CUDA; setup is outside docker. The stub runs on the existing `python:3.11-slim` Dockerfile, zero extra deps beyond what `/process` already needs. +- **Job-queue implementation knowledge.** The ADC has a dedicated `trapdata.antenna` subpackage with `AntennaClient`, `AntennaConfig`, CLI wiring, and its own ACK bookkeeping. The stub collapses all that to ~100 lines in `worker/` because the stub doesn't need configuration flexibility — just enough to exercise the Antenna side. + +### Against PR #1011 (Celery-direct worker, never merged) + +PR #1011 (author: @vanessavmac) was an earlier attempt at PSv2 that took a different architectural path. It's worth explaining why the NATS/API approach wins for this stub's use case. + +**How #1011 worked:** +- Added a `celery_worker/` subdirectory to each processing service (`minimal/`, `example/`). +- That worker imported `celery` + `kombu` and connected **directly to Antenna's RabbitMQ broker** (`amqp://rabbituser:rabbitpass@rabbitmq:5672//`). +- Antenna dispatched ML work by enqueuing `process_pipeline_request` tasks onto `ml-pipeline-` queues. The PS celery worker was a Celery consumer bound to those queues. +- No HTTP between Antenna and the PS for task dispatch — Celery/RabbitMQ handled it. The PS still served `/info`/`/process` over HTTP for registration and v1 compatibility. +- Refactored `api.py` → `processing.py` to extract `process_pipeline_request`, shared by the HTTP endpoint and the Celery task. + +**Why NATS/API wins here:** +- **Deployability across trust boundaries.** Celery-direct requires the PS to have broker credentials and network access to RabbitMQ. That's fine inside a single `docker compose` stack but breaks down for a GPU fleet behind a firewall, an external partner's infrastructure, or anywhere the PS is outside the Antenna network perimeter. NATS/API gives the PS an HTTP-only surface with a per-service auth token (or API key post-#1194). +- **Coupling surface.** Celery-direct ties the PS to the exact broker, Celery version, queue routing conventions, and task signatures that Antenna uses. NATS/API exposes only the JSON shape of three endpoints. Easier to evolve on Antenna's side without breaking external workers. +- **Matches the ADC.** #1011 predates the ADC's PSv2 work; the ADC uses the NATS/API path, not Celery-direct. Converging on ADC's contract means one protocol for Antenna to support and one stub (this one) that mirrors what production workers actually do. +- **Competing consumers.** Celery/RabbitMQ's queue semantics do give you competing consumers for free. The NATS path gets the same property via JetStream's pull-subscribe — Antenna's `nats_queue.py` already wires this up. Not a differentiator either way. + +**What #1011 got right that this PR inherits:** +- Factor shared processing logic out of the FastAPI endpoint so both v1 and v2 call the same code. This PR does the same (see `api.api.pipeline_choices` used by both `/process` and `worker/runner.py`) — the factoring is slightly lighter-touch because the stub pipelines already expose `pipeline_choices` at module level. + +## Testing strategy + +### What this PR verifies + +- `docker compose -f processing_services/docker-compose.yml build ml_backend_minimal` succeeds. +- `MODE=api` container is backward-compatible with existing CI (same `/process` / `/info` / `/livez` / `/readyz`). +- `MODE=api+worker` container: in a full local stack, `curl` submits an `async_api` job for the `constant` pipeline, and the worker processes it to completion. `Job.status` reaches `SUCCESS`, `SourceImage.detections` populated, `Occurrence` rows created. +- `ensure_default_project` is idempotent: running twice is a no-op. + +### What this PR does not verify + +- API-key auth path (depends on PR #1194). +- Chaos-testing path (Redis/NATS fault injection) — the stub should work with those but it's a follow-up to wire into `scripts/psv2_integration_test.sh`. +- GitHub CI for an async path — current CI only exercises `/process`. Follow-up: add an async-job test that uses `MODE=worker`. + +## Open questions and follow-ups + +1. **Rename `example/` → `complete` / `global_moths`?** Deferred. The "example" service is already the "real one" — once we have the mothbot/foundation-model pipeline bundled, we rename. Not in this PR. +2. **Should `example/` also learn pull mode?** Yes, but as a second follow-up. The pattern established by the `worker/` subdirectory is directly reusable. +3. **`ANTENNA_DEFAULT_PROJECT_ID=1` conflicts:** Ensure command looks up by slug, not PK. PK 1 may already be claimed in long-lived dev DBs; we export whatever PK the seeded project actually has. +4. **Healthchecks for sequencing:** PR doesn't add Docker healthchecks for Django; register.py's retry loop is the sole mechanism for "wait for Antenna ready". Could add a healthcheck in a follow-up if retry noise becomes a problem. +5. **Worker heartbeat.** On main, ProcessingService.last_seen is updated only when the pipelines-registration endpoint is called. PR #1194 adds heartbeat-on-every-request via `_mark_pipeline_pull_services_seen` in `ami/jobs/views.py`. After #1194 lands, workers show up as "online" in the Antenna admin automatically. + +## Links + +- Antenna job queue API: `ami/jobs/views.py::JobViewSet.tasks`, `.result` +- Schemas: `ami/ml/schemas.py` (`PipelineProcessingTask`, `PipelineTaskResult`, `ProcessingServiceClientInfo`) +- Celery result handler: `ami/jobs/tasks.py::process_nats_pipeline_result` +- Async state manager: `ami/ml/orchestration/async_job_state.py` +- Task queue: `ami/ml/orchestration/nats_queue.py` +- ADC reference: https://github.com/RolnickLab/ami-data-companion/tree/main/trapdata/antenna diff --git a/processing_services/README.md b/processing_services/README.md index 48bb254e7..1b0046907 100644 --- a/processing_services/README.md +++ b/processing_services/README.md @@ -16,6 +16,31 @@ In this directory, we define locally-run processing services as FastAPI apps. A If your goal is to run an ML backend locally, simply copy the `example` app and follow the steps below. +## v1 vs v2 processing services + +Antenna supports two processing-service paradigms: + +- **v1 (push / synchronous):** the service exposes `/info`, `/livez`, `/readyz`, `/process`. Antenna POSTs a `PipelineRequest` to `/process` when a job runs. Good for interactive/playground requests on a single image and for exposing pipeline schema at `/api/v2/docs/`. +- **v2 (pull / async / worker):** a worker polls Antenna's job-queue HTTP endpoints (`POST /api/v2/jobs/{id}/tasks/`, `POST /api/v2/jobs/{id}/result/`) and processes tasks queued by Antenna in NATS JetStream. Antenna proxies NATS internally so the worker only talks HTTP. Good for long-running workloads, firewall-isolated GPU fleets, and horizontal scaling. + +The `minimal` container supports **both modes** and is driven by a `MODE` env var: + +- `MODE=api` (default; what CI uses) — v1 FastAPI only. +- `MODE=worker` — v2 worker poll loop only. +- `MODE=api+worker` — both, in one container. Used by `processing_services/docker-compose.yml` for local dev. Exercises the real v2 pull path (Redis / NATS / Celery / Celery Beat) with lightweight stub pipelines, no ML dependencies. + +See `docs/claude/planning/2026-04-17-minimal-worker-design.md` for the design. + +### Running the v2 path locally + +```bash +docker compose up -d # Antenna core +docker compose -f processing_services/docker-compose.yml up -d # minimal container in api+worker mode +``` + +The minimal container self-registers a `ProcessingService` (no `endpoint_url`, async-mode) against the "Default Project" seeded by `ami/main/management/commands/ensure_default_project.py`. Submit an `async_api` job for the `constant` or `random-detection-random-species` pipeline and the worker picks it up. + + ## Environment Set Up 1. Update `processing_services/example/requirements.txt` with required packages (i.e. PyTorch, etc) diff --git a/processing_services/docker-compose.yml b/processing_services/docker-compose.yml index 91a21c100..a8fe4a5bf 100644 --- a/processing_services/docker-compose.yml +++ b/processing_services/docker-compose.yml @@ -10,6 +10,11 @@ services: - minio:host-gateway networks: - antenna_network + # start.sh dispatches on MODE: api | worker | api+worker. For api+worker, + # start.sh runs FastAPI first (for register.py to read /info), then + # register.py, then the worker poll loop. + env_file: + - ./minimal/.env.dev ml_backend_example: build: diff --git a/processing_services/minimal/.env.dev b/processing_services/minimal/.env.dev new file mode 100644 index 000000000..213eadd01 --- /dev/null +++ b/processing_services/minimal/.env.dev @@ -0,0 +1,29 @@ +# Dev defaults for the minimal processing service container. +# +# Loaded via `env_file:` in processing_services/docker-compose.yml when you run +# the stub locally against a fresh Antenna stack. These are the same creds +# seeded by ami/main/management/commands/ensure_default_project.py and +# .envs/.local/.django — intentionally not secret. +# +# Copy to .env.minimal-worker and edit to point at a different Antenna +# deployment. CI loads this same file. + +# ── Container mode ────────────────────────────────────────────────────────── +# api FastAPI only (default in Dockerfile; used by docker-compose.ci.yml) +# worker poll loop only +# api+worker both, plus register.py on boot — used by local dev compose below +MODE=api+worker + +# ── Antenna target ───────────────────────────────────────────────────────── +ANTENNA_API_URL=http://django:8000 +ANTENNA_DEFAULT_PROJECT_NAME=Default Project +ANTENNA_SERVICE_NAME=minimal-worker-dev + +# ── Auth (user/password fallback; overridden by ANTENNA_API_KEY / _AUTH_TOKEN if set) ── +ANTENNA_USER=antenna@insectai.org +ANTENNA_PASSWORD=localadmin + +# ── Worker tuning ────────────────────────────────────────────────────────── +WORKER_POLL_INTERVAL_SECONDS=2.0 +WORKER_BATCH_SIZE=4 +WORKER_REQUEST_TIMEOUT_SECONDS=30 diff --git a/processing_services/minimal/Dockerfile b/processing_services/minimal/Dockerfile index 0686b4471..d26ed17ae 100644 --- a/processing_services/minimal/Dockerfile +++ b/processing_services/minimal/Dockerfile @@ -6,4 +6,9 @@ COPY . /app RUN pip install -r ./requirements.txt -CMD ["python", "/app/main.py"] +# start.sh chooses between FastAPI, worker, or both based on $MODE. +# Default to api (FastAPI push-mode) for backward compatibility with existing CI. +ENV MODE=api +RUN chmod +x /app/start.sh + +CMD ["/app/start.sh"] diff --git a/processing_services/minimal/api/schemas.py b/processing_services/minimal/api/schemas.py index b0febba1b..3481b763c 100644 --- a/processing_services/minimal/api/schemas.py +++ b/processing_services/minimal/api/schemas.py @@ -291,3 +291,9 @@ class ProcessingServiceInfoResponse(pydantic.BaseModel): # default=list, # examples=[RANDOM_BINARY_CLASSIFIER], # ) + + +# v2 pull-mode schemas (PipelineProcessingTask, PipelineTaskResult, +# ProcessingServiceClientInfo, AsyncPipelineRegistrationRequest, ...) live in +# `processing_services/minimal/worker/schemas.py` since only the worker path +# uses them. The v1 schemas above are shared by both push and pull. diff --git a/processing_services/minimal/register.py b/processing_services/minimal/register.py new file mode 100644 index 000000000..b8e4467fe --- /dev/null +++ b/processing_services/minimal/register.py @@ -0,0 +1,214 @@ +""" +Self-register this processing service's pipelines with Antenna. + +What this does, in order: + 1. Resolve an Authorization header (env var or fallback login). + 2. Resolve the target project id (env var, else look up by name). + 3. Fetch our own /info to get the list of pipelines this container serves. + 4. POST that list to `/api/v2/projects/{id}/pipelines/` so Antenna knows which + async pipelines this ProcessingService can handle. + +About identity: on main, the server looks up / creates a `ProcessingService` +record by the `processing_service_name` field in the request body, and grants +write access based on the Authorization header's user. PR #1194 changes that +to use API keys — the PS record is derived from the key itself and +`processing_service_name` is no longer sent. We tolerate both by sending +`processing_service_name` now; #1194-enabled Antenna will ignore the field and +pick the PS from the key. + +Env vars are read via `os.environ[...]` without fallbacks — the .env file is +expected to provide them. See `processing_services/.env.example`. +""" + +import logging +import os +import sys +import time + +import requests +from api.api import pipelines as pipeline_classes # type: ignore[import-not-found] +from api.schemas import PipelineConfigResponse # type: ignore[import-not-found] +from worker.schemas import ( # type: ignore[import-not-found] + AsyncPipelineRegistrationRequest, + ProcessingServiceClientInfo, +) + +logger = logging.getLogger(__name__) + +MAX_RETRIES = 20 +RETRY_DELAY = 3 # seconds + +CACHED_AUTH_HEADER_PATH = "/tmp/antenna_auth_header" + + +def get_client_info() -> ProcessingServiceClientInfo: + """Identity metadata sent to Antenna in the registration body. + + `ProcessingServiceClientInfo` has `extra="allow"`, so any keys here are + forwarded verbatim. On main the registration serializer ignores unknown + fields; PR #1194 consumes this field. + """ + import platform + import socket + + return ProcessingServiceClientInfo.model_validate( + { + "hostname": socket.gethostname(), + "software": "antenna-minimal-worker", + "version": "0.1.0", + "platform": platform.platform(), + } + ) + + +def auth_header() -> dict[str, str] | None: + """Pick an auth header based on what env vars are set, or None to trigger login flow.""" + api_key = os.environ.get("ANTENNA_API_KEY") + if api_key: + # PR #1194 path. Harmless on main — main ignores unknown auth schemes + # and falls through to the next header, which we don't send. + return {"Authorization": f"Api-Key {api_key}"} + + token = os.environ.get("ANTENNA_API_AUTH_TOKEN") + if token: + return {"Authorization": f"Token {token}"} + + return None + + +def login(api_url: str, email: str, password: str) -> str: + resp = requests.post( + f"{api_url}/api/v2/auth/token/login/", + json={"email": email, "password": password}, + timeout=10, + ) + resp.raise_for_status() + token = resp.json().get("auth_token") + if not token: + raise ValueError(f"Login returned no auth_token: {resp.json()}") + return token + + +def resolve_project_id(api_url: str, headers: dict[str, str]) -> str: + """Return ANTENNA_PROJECT_ID if set, else look up by name via REST API.""" + explicit = os.environ.get("ANTENNA_PROJECT_ID") + if explicit: + return explicit + + name = os.environ["ANTENNA_DEFAULT_PROJECT_NAME"] + resp = requests.get(f"{api_url}/api/v2/projects/", headers=headers, timeout=10) + resp.raise_for_status() + for project in resp.json().get("results", []): + if project.get("name") == name: + pk = str(project["id"]) + logger.info("Resolved project '%s' to id=%s", name, pk) + return pk + raise RuntimeError(f"No project found with name '{name}' — ensure_default_project should have created it") + + +def fetch_own_pipelines() -> list[PipelineConfigResponse]: + """Return the pipeline configs this container serves. + + Imported directly from the api module rather than fetched over HTTP from + the co-located FastAPI service — register.py runs in the same container, + and importing avoids having to wait for FastAPI to be up (which it isn't + in MODE=worker). + """ + return [p.config for p in pipeline_classes] + + +def register( + api_url: str, + project_id: str, + headers: dict[str, str], + pipelines: list[PipelineConfigResponse], +) -> None: + """POST pipelines to the project registration endpoint. + + Sends the schema-defined `AsyncPipelineRegistrationRequest` body. We also + attach a `client_info` field, which is ignored on main (unknown field) and + read by PR #1194. + """ + service_name = os.environ["ANTENNA_SERVICE_NAME"] + body = AsyncPipelineRegistrationRequest( + processing_service_name=service_name, + pipelines=pipelines, + ).model_dump(mode="json") + body["client_info"] = get_client_info().model_dump(mode="json") + + url = f"{api_url}/api/v2/projects/{project_id}/pipelines/" + resp = requests.post(url, json=body, headers=headers, timeout=30) + if resp.status_code in (200, 201): + logger.info( + "Registered %d pipelines as '%s' (project=%s)", + len(pipelines), + service_name, + project_id, + ) + return + raise RuntimeError(f"Registration failed: {resp.status_code} {resp.text}") + + +def main() -> int: + logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") + + api_url = os.environ["ANTENNA_API_URL"] + + pipelines = fetch_own_pipelines() + if not pipelines: + logger.warning("No pipelines found from local /info; nothing to register") + return 0 + + # Auth: use explicit header if provided, else log in. + headers = auth_header() + if headers is None: + email = os.environ["ANTENNA_USER"] + password = os.environ["ANTENNA_PASSWORD"] + # Retry login so we tolerate "Django not up yet". + for attempt in range(MAX_RETRIES): + try: + token = login(api_url, email, password) + headers = {"Authorization": f"Token {token}"} + break + except (requests.ConnectionError, requests.Timeout, requests.HTTPError) as e: + logger.info("Login retry (%d/%d): %s", attempt + 1, MAX_RETRIES, e) + time.sleep(RETRY_DELAY) + else: + logger.error("Could not authenticate after %d attempts", MAX_RETRIES) + return 1 + + # Resolve project id (may also need to wait for ensure_default_project to run). + project_id = None + for attempt in range(MAX_RETRIES): + try: + project_id = resolve_project_id(api_url, headers) + break + except (requests.ConnectionError, requests.Timeout, RuntimeError) as e: + logger.info("Project lookup retry (%d/%d): %s", attempt + 1, MAX_RETRIES, e) + time.sleep(RETRY_DELAY) + if project_id is None: + logger.error("Could not resolve project after %d attempts", MAX_RETRIES) + return 1 + + # Cache the resolved id and auth header for worker_main.py to reuse. + os.environ["ANTENNA_PROJECT_ID"] = project_id + with open(CACHED_AUTH_HEADER_PATH, "w") as f: + f.write(next(iter(headers.values()))) + + for attempt in range(MAX_RETRIES): + try: + register(api_url, project_id, headers, pipelines) + return 0 + except (requests.ConnectionError, requests.Timeout) as e: + logger.info("Registration retry (%d/%d): %s", attempt + 1, MAX_RETRIES, e) + time.sleep(RETRY_DELAY) + except RuntimeError as e: + logger.error("%s", e) + return 1 + + logger.error("Registration failed after %d attempts", MAX_RETRIES) + return 1 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/processing_services/minimal/start.sh b/processing_services/minimal/start.sh new file mode 100755 index 000000000..18b35d0e6 --- /dev/null +++ b/processing_services/minimal/start.sh @@ -0,0 +1,79 @@ +#!/bin/bash +# Container entrypoint. Dispatches one of three modes via the MODE env var: +# +# MODE=api FastAPI server only (default; CI uses this for existing /process tests) +# MODE=worker Poll loop only (register.py runs once, then worker_main.py polls) +# MODE=api+worker FastAPI + register.py + worker loop (local dev default; exercises both v1 and v2) +# +# See docs/claude/planning/2026-04-17-minimal-worker-design.md for the rationale. + +set -e + +MODE="${MODE:-api}" + +# Signal forwarding so compose can stop us cleanly. +PIDS=() +cleanup() { + for pid in "${PIDS[@]}"; do + kill -TERM "$pid" 2>/dev/null || true + done + wait "${PIDS[@]}" 2>/dev/null || true +} +trap cleanup TERM INT + +start_api() { + echo "[start.sh] Starting FastAPI server (MODE=$MODE)" + python /app/main.py & + PIDS+=("$!") +} + +start_register() { + # register.py imports pipelines from the api module directly (no HTTP to + # self), self-provisions a ProcessingService, and registers pipelines + # with Antenna. Its retry loop handles "Antenna not up yet". + # Skip if registration env vars aren't set — the container still works + # as a pure v1 push service without them. + if [ -z "${ANTENNA_API_URL:-}" ]; then + echo "[start.sh] Skipping registration (ANTENNA_API_URL not set)" + return + fi + if [ -z "${ANTENNA_PROJECT_ID:-}" ] && [ -z "${ANTENNA_DEFAULT_PROJECT_NAME:-}" ]; then + echo "[start.sh] Skipping registration (no ANTENNA_PROJECT_ID or ANTENNA_DEFAULT_PROJECT_NAME)" + return + fi + echo "[start.sh] Running registration" + python /app/register.py || echo "[start.sh] Registration failed, continuing" +} + +start_worker() { + echo "[start.sh] Starting worker poll loop" + python /app/worker_main.py & + PIDS+=("$!") +} + +case "$MODE" in + api) + start_api + ;; + worker) + start_register + start_worker + ;; + api+worker) + # FastAPI first so /process is available even while register.py is + # still retrying against a not-yet-ready Antenna. register.py doesn't + # depend on FastAPI (imports pipeline configs directly), so there's + # no startup race here. + start_api + start_register + start_worker + ;; + *) + echo "[start.sh] Unknown MODE: $MODE (expected: api | worker | api+worker)" >&2 + exit 2 + ;; +esac + +# Block on any child; if any exits, cleanup and exit so compose restarts us. +wait -n "${PIDS[@]}" +cleanup diff --git a/processing_services/minimal/worker/__init__.py b/processing_services/minimal/worker/__init__.py new file mode 100644 index 000000000..789c94e82 --- /dev/null +++ b/processing_services/minimal/worker/__init__.py @@ -0,0 +1,5 @@ +""" +v2 pull-mode worker for the minimal processing service. + +See docs/claude/planning/2026-04-17-minimal-worker-design.md. +""" diff --git a/processing_services/minimal/worker/client.py b/processing_services/minimal/worker/client.py new file mode 100644 index 000000000..be051a7cc --- /dev/null +++ b/processing_services/minimal/worker/client.py @@ -0,0 +1,119 @@ +""" +HTTP client for the Antenna job-queue REST API. + +Thin wrapper around requests.Session with a single retry policy and a single +auth header. All three endpoints (list active jobs, reserve tasks, submit +results) are thin wrappers around one POST or GET call — no attempt at +connection pooling tricks beyond what Session gives for free. +""" + +from __future__ import annotations + +import logging + +import requests +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry + +from .schemas import PipelineProcessingTask, PipelineTaskResult, ProcessingServiceClientInfo, TasksResponse + +logger = logging.getLogger(__name__) + + +class AntennaClient: + def __init__(self, api_url: str, auth_header: str, timeout: float = 30.0) -> None: + self.api_url = api_url.rstrip("/") + self.timeout = timeout + self.session = requests.Session() + self.session.headers.update({"Authorization": auth_header}) + + # Retry only on 5xx and network-level failures. 4xx is a programming + # error we want to see immediately, not paper over. + retry = Retry( + total=3, + backoff_factor=0.5, + status_forcelist=[500, 502, 503, 504], + allowed_methods=frozenset(["GET", "POST"]), + raise_on_status=False, + ) + adapter = HTTPAdapter(max_retries=retry) + self.session.mount("http://", adapter) + self.session.mount("https://", adapter) + + def list_active_jobs(self, pipeline_slug: str) -> list[int]: + """ + Find STARTED job ids for a single pipeline slug. + + Calls GET /jobs/?pipeline__slug=&status=STARTED&ids_only=true. + The server's `pipeline` filter expects a DB id; `pipeline__slug` is + the slug-based alias exposed by JobFilterSet. + """ + try: + resp = self.session.get( + f"{self.api_url}/api/v2/jobs/", + params={"pipeline__slug": pipeline_slug, "status": "STARTED", "ids_only": "true"}, + timeout=self.timeout, + ) + resp.raise_for_status() + except requests.RequestException as e: + logger.warning("list_active_jobs failed for pipeline=%s: %s", pipeline_slug, e) + return [] + + payload = resp.json() + # `ids_only=true` returns a flat list. Some list endpoints return + # {"results": [...]}; handle both. + entries = payload if isinstance(payload, list) else payload.get("results", []) + ids: set[int] = set() + for entry in entries: + if isinstance(entry, dict) and "id" in entry: + ids.add(int(entry["id"])) + elif isinstance(entry, int): + ids.add(entry) + return sorted(ids) + + def reserve_tasks( + self, + job_id: int, + batch_size: int, + client_info: ProcessingServiceClientInfo | None = None, + ) -> list[PipelineProcessingTask]: + """ + POST /jobs/{id}/tasks/ — reserve up to batch_size tasks from the NATS + queue for the given job. Antenna proxies NATS internally; we never + touch NATS from here. + """ + body: dict = {"batch_size": batch_size} + if client_info is not None: + body["client_info"] = client_info.model_dump(mode="json") + resp = self.session.post( + f"{self.api_url}/api/v2/jobs/{job_id}/tasks/", + json=body, + timeout=self.timeout, + ) + if resp.status_code == 503: + logger.info("Task queue temporarily unavailable for job %s", job_id) + return [] + resp.raise_for_status() + return TasksResponse.model_validate(resp.json()).tasks + + def submit_results( + self, + job_id: int, + results: list[PipelineTaskResult], + client_info: ProcessingServiceClientInfo | None = None, + ) -> dict: + """ + POST /jobs/{id}/result/ — deliver a list of PipelineTaskResult items. + + Antenna queues one Celery task per result for async processing. + """ + body: dict = {"results": [r.model_dump(mode="json") for r in results]} + if client_info is not None: + body["client_info"] = client_info.model_dump(mode="json") + resp = self.session.post( + f"{self.api_url}/api/v2/jobs/{job_id}/result/", + json=body, + timeout=self.timeout, + ) + resp.raise_for_status() + return resp.json() diff --git a/processing_services/minimal/worker/loop.py b/processing_services/minimal/worker/loop.py new file mode 100644 index 000000000..dace226d7 --- /dev/null +++ b/processing_services/minimal/worker/loop.py @@ -0,0 +1,83 @@ +""" +Worker poll loop. Mode B (all-pipelines) per the design doc. + +Iterates each of this container's registered pipeline slugs in turn, so the +slug for any given job is always the outer loop variable — no reverse lookup +from job_id → slug needed. + +# TODO(follow-up): add --pipeline flag (mode A) so multiple workers can +# run side-by-side as competing consumers for the same pipeline. +# TODO(follow-up): add --job-id flag (mode C) for one-shot drain-and-exit +# runs, for test_ml_job_e2e-style harnesses. +""" + +from __future__ import annotations + +import logging +import os +import signal +import time + +from api.api import pipeline_choices # type: ignore[import-not-found] + +from .client import AntennaClient +from .runner import process_task +from .schemas import ProcessingServiceClientInfo + +logger = logging.getLogger(__name__) + + +class Loop: + def __init__(self, client: AntennaClient, client_info: ProcessingServiceClientInfo) -> None: + self.client = client + self.client_info = client_info + self.shutdown = False + self.poll_interval = float(os.environ["WORKER_POLL_INTERVAL_SECONDS"]) + self.batch_size = int(os.environ["WORKER_BATCH_SIZE"]) + + def _install_signal_handlers(self) -> None: + def _stop(signum, frame): # noqa: ARG001 + logger.info("Received signal %s, shutting down", signum) + self.shutdown = True + + signal.signal(signal.SIGTERM, _stop) + signal.signal(signal.SIGINT, _stop) + + def run(self) -> None: + self._install_signal_handlers() + logger.info("Polling for jobs on pipelines: %s", list(pipeline_choices)) + + while not self.shutdown: + try: + iterated = self._iterate() + except Exception: + # Log full traceback and keep going; a faulty poll shouldn't kill the worker. + logger.exception("Poll iteration failed") + iterated = False + if not iterated: + time.sleep(self.poll_interval) + + def _iterate(self) -> bool: + """One poll cycle across all registered pipeline slugs. Returns True if any work was done.""" + did_work = False + for slug in pipeline_choices: + if self.shutdown: + break + job_ids = self.client.list_active_jobs(slug) + for job_id in job_ids: + if self.shutdown: + break + tasks = self.client.reserve_tasks(job_id, self.batch_size, client_info=self.client_info) + if not tasks: + continue + did_work = True + results = [process_task(task, slug) for task in tasks] + ack = self.client.submit_results(job_id, results, client_info=self.client_info) + logger.info( + "Job %s (%s): processed %d task(s), results_queued=%s", + job_id, + slug, + len(tasks), + ack.get("results_queued"), + ) + return did_work diff --git a/processing_services/minimal/worker/runner.py b/processing_services/minimal/worker/runner.py new file mode 100644 index 000000000..0f0a20d63 --- /dev/null +++ b/processing_services/minimal/worker/runner.py @@ -0,0 +1,70 @@ +""" +Turn a single PipelineProcessingTask into a PipelineTaskResult by running one +of the stub pipelines from processing_services/minimal/api/pipelines.py. + +Pipelines and algorithms are imported from `api.*` so that the behavior of the +stub is identical between: + - v1 POST /process (Antenna sends a PipelineRequest) + - v2 worker (this file — builds a synthetic one-image run) + +Any exception raised during processing is converted to a PipelineResultsError +so the reply_subject ACK still fires downstream. +""" + +from __future__ import annotations + +import logging +import time + +from api.api import pipeline_choices # type: ignore[import-not-found] +from api.schemas import PipelineResultsResponse, SourceImage, SourceImageResponse # type: ignore[import-not-found] + +from .schemas import PipelineProcessingTask, PipelineResultsError, PipelineTaskResult + +logger = logging.getLogger(__name__) + + +def process_task(task: PipelineProcessingTask, pipeline_slug: str) -> PipelineTaskResult: + """ + Run the stub pipeline against a single image task. + """ + # The reserve_tasks endpoint always returns a reply_subject for v2 tasks; + # if we somehow got None, the server's ACK path won't fire — we still + # return a result so the caller can log it. + reply_subject = task.reply_subject or "" + + try: + PipelineCls = pipeline_choices[pipeline_slug] + except KeyError: + return _error_result(reply_subject, task.image_id, f"Unknown pipeline slug '{pipeline_slug}'") + + try: + source_image = SourceImage(id=task.image_id, url=task.image_url) + source_image.open(raise_exception=True) + except Exception as e: + return _error_result(reply_subject, task.image_id, f"Failed to open image: {e}") + + start = time.time() + try: + pipeline = PipelineCls(source_images=[source_image], existing_detections=[]) + detections = pipeline.run() + except Exception as e: + logger.exception("Pipeline run failed for task %s", task.id) + return _error_result(reply_subject, task.image_id, f"Pipeline run failed: {e}") + elapsed = time.time() - start + + result = PipelineResultsResponse( + pipeline=pipeline_slug, # type: ignore[arg-type] # already validated via pipeline_choices lookup + total_time=elapsed, + source_images=[SourceImageResponse(id=source_image.id, url=source_image.url or "")], + detections=detections, + ) + return PipelineTaskResult(reply_subject=reply_subject, result=result) + + +def _error_result(reply_subject: str, image_id: str, msg: str) -> PipelineTaskResult: + logger.warning("Task error (image_id=%s): %s", image_id, msg) + return PipelineTaskResult( + reply_subject=reply_subject, + result=PipelineResultsError(error=msg, image_id=image_id), + ) diff --git a/processing_services/minimal/worker/schemas.py b/processing_services/minimal/worker/schemas.py new file mode 100644 index 000000000..9a6c0c7cf --- /dev/null +++ b/processing_services/minimal/worker/schemas.py @@ -0,0 +1,70 @@ +""" +v2 pull-mode (worker) Pydantic schemas. + +Mirror of the corresponding classes in ami/ml/schemas.py. These live in this +package — separate from api/schemas.py — because they're only used by the +async worker, not by the v1 push FastAPI service. The v1 schemas that are +shared by both paths (PipelineResultsResponse, PipelineConfigResponse, etc.) +stay in api/schemas.py and are imported here. + +Keep these in sync with Antenna's canonical schemas when they evolve — +field-for-field parity matters for correct JSON round-trips. +""" + +from __future__ import annotations + +import pydantic +from api.schemas import PipelineConfigResponse, PipelineResultsResponse # type: ignore[import-not-found] + + +class PipelineResultsError(pydantic.BaseModel): + """Error result when pipeline processing fails for a single task.""" + + error: str + image_id: str | None = None + + +class PipelineProcessingTask(pydantic.BaseModel): + """A single image task reserved from the async job queue. + + `reply_subject` is the NATS subject Antenna ACKs on when the result comes + back — the worker must round-trip it verbatim in the matching + PipelineTaskResult. + """ + + id: str + image_id: str + image_url: str + reply_subject: str | None = None + + +class TasksResponse(pydantic.BaseModel): + """Response body of `POST /api/v2/jobs/{id}/tasks/`.""" + + tasks: list[PipelineProcessingTask] = [] + + +class PipelineTaskResult(pydantic.BaseModel): + """Result of processing a single PipelineProcessingTask.""" + + reply_subject: str + result: PipelineResultsResponse | PipelineResultsError + + +class ProcessingServiceClientInfo(pydantic.BaseModel): + """Identity metadata sent by a processing service worker. + + A ProcessingService in the DB may have multiple physical workers running + simultaneously; this lets the server distinguish them. Fields are + intentionally open — processing services can send any useful key/value + pairs (hostname, software version, pod name, etc). + """ + + model_config = pydantic.ConfigDict(extra="allow") + + +class AsyncPipelineRegistrationRequest(pydantic.BaseModel): + """Body for `POST /api/v2/projects/{id}/pipelines/` from an async processing service.""" + + processing_service_name: str + pipelines: list[PipelineConfigResponse] = [] diff --git a/processing_services/minimal/worker_main.py b/processing_services/minimal/worker_main.py new file mode 100644 index 000000000..f060c363b --- /dev/null +++ b/processing_services/minimal/worker_main.py @@ -0,0 +1,97 @@ +""" +Entry point for MODE=worker and for the worker child in MODE=api+worker. + +Expects register.py to have run first and written the resolved auth token to +/tmp/antenna_auth_header. If the file is absent (e.g. someone runs the worker +without registration), we fall back to the same env-var-based auth flow. +All env vars required for auth must be provided via the .env file — this +module does not hard-code dev defaults. +""" + +from __future__ import annotations + +import logging +import os +import platform +import socket +import sys +import time + +import requests +from worker.schemas import ProcessingServiceClientInfo # type: ignore[import-not-found] + +LOG = logging.getLogger(__name__) + +CACHED_AUTH_HEADER_PATH = "/tmp/antenna_auth_header" +MAX_LOGIN_ATTEMPTS = 20 +LOGIN_RETRY_DELAY = 3 # seconds + + +def _load_auth_header() -> str: + if os.path.exists(CACHED_AUTH_HEADER_PATH): + with open(CACHED_AUTH_HEADER_PATH) as f: + raw = f.read().strip() + if raw: + return raw + + api_key = os.environ.get("ANTENNA_API_KEY") + if api_key: + return f"Api-Key {api_key}" + + token = os.environ.get("ANTENNA_API_AUTH_TOKEN") + if token: + return f"Token {token}" + + # No cached header, no API key, no static token → log in with user/password. + email = os.environ["ANTENNA_USER"] + password = os.environ["ANTENNA_PASSWORD"] + api_url = os.environ["ANTENNA_API_URL"] + for attempt in range(MAX_LOGIN_ATTEMPTS): + try: + r = requests.post( + f"{api_url}/api/v2/auth/token/login/", + json={"email": email, "password": password}, + timeout=10, + ) + r.raise_for_status() + return f"Token {r.json()['auth_token']}" + except (requests.ConnectionError, requests.Timeout, requests.HTTPError) as e: + LOG.info("Worker login retry %d/%d: %s", attempt + 1, MAX_LOGIN_ATTEMPTS, e) + time.sleep(LOGIN_RETRY_DELAY) + raise RuntimeError("Worker could not authenticate with Antenna") + + +def _client_info() -> ProcessingServiceClientInfo: + """Metadata sent alongside each task/result request. + + `ProcessingServiceClientInfo` has `extra="allow"`, so any fields here are + forwarded to Antenna verbatim. + """ + return ProcessingServiceClientInfo.model_validate( + { + "hostname": socket.gethostname(), + "software": "antenna-minimal-worker", + "version": "0.1.0", + "platform": platform.platform(), + } + ) + + +def main() -> int: + logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") + api_url = os.environ["ANTENNA_API_URL"] + + auth = _load_auth_header() + + # Late import so tests can run without the worker-side api deps on the path. + from worker.client import AntennaClient + from worker.loop import Loop + + client = AntennaClient(api_url, auth, timeout=float(os.environ["WORKER_REQUEST_TIMEOUT_SECONDS"])) + loop = Loop(client, _client_info()) + loop.run() + return 0 + + +if __name__ == "__main__": + sys.exit(main())