feat: assign different categories of background tasks to separate queues#1257
feat: assign different categories of background tasks to separate queues#1257
Conversation
…tarvation Previously all Celery tasks shared a single 'antenna' queue, so a burst of high-volume tasks could block lower-volume ones on the same worker pool. Observed scenario: ~740-image async_api job emitted ~180 process_nats_pipeline_result tasks per 5 min and starved run_job invocations behind it, leaving newly submitted jobs stuck in PENDING for many minutes. Long-running run_job tasks can similarly hold worker slots and delay beat / housekeeping tasks. Split into three queues, each with its own worker service: antenna default — beat tasks, cache refresh, sync, housekeeping jobs run_job (can hold a slot for hours) ml_results process_nats_pipeline_result + save_results bursts Worker start script now takes CELERY_QUEUES as env var (default: antenna) so one image serves all three services. Worker-only hosts (ami-worker-2, ami-worker-3) consume all three queues as spillover capacity via docker-compose.worker.yml. Relates to #1256 (job logging bottleneck) and #1026 (concurrent job log updates) — those two tackle the write-path; this change tackles the dispatch-path.
Match the production start script: read the queue list from $CELERY_QUEUES, defaulting to all three queues (antenna, jobs, ml_results) so the single local worker keeps consuming everything by default. Lets devs override for isolation testing if they want. Co-Authored-By: Claude <noreply@anthropic.com>
✅ Deploy Preview for antenna-ssec canceled.
|
✅ Deploy Preview for antenna-preview canceled.
|
📝 WalkthroughWalkthroughWorker startup scripts and Docker Compose were changed to select Celery queues from a CELERY_QUEUES environment variable; Celery task routing was added to send specific tasks to Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~22 minutes Possibly related issues
Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Pull request overview
This PR introduces a Celery queue split to isolate long-running job execution and bursty ML-result processing from default “housekeeping” tasks, reducing cross-task starvation in RabbitMQ/Celery deployments.
Changes:
- Add
CELERY_TASK_ROUTESto routerun_job→jobsand ML-result tasks →ml_results(default queue remainsantenna). - Parameterize Celery worker startup to consume queues from
CELERY_QUEUES, and update staging/production/worker-host compose stacks to run dedicated workers per queue. - Add an internal rollout/verification plan document for demo → production deployment sequencing.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| docs/claude/planning/celery-queue-split-rollout.md | Adds rollout + verification + rollback procedure for the queue split. |
| docker-compose.worker.yml | Configures worker-only hosts to consume all three queues by default via CELERY_QUEUES. |
| docker-compose.staging.yml | Splits staging workers into three dedicated services (antenna, jobs, ml_results). |
| docker-compose.production.yml | Splits production workers into three dedicated services (antenna, jobs, ml_results). |
| config/settings/base.py | Sets default queue to antenna and adds task routing rules for the queue split. |
| compose/production/django/celery/worker/start | Reads CELERY_QUEUES to control which queues a worker consumes in prod images. |
| compose/local/django/celery/worker/start | Updates local worker startup to consume multiple queues by default (but currently has a quoting bug). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@docker-compose.worker.yml`:
- Around line 29-33: The docker-compose.worker.yml currently hard-codes
CELERY_QUEUES in the environment block which overrides values from the service
env_file; remove the hard-coded value so the service picks up CELERY_QUEUES from
each host’s .env, or replace the hard-code with a shell-style fallback like
${CELERY_QUEUES:-antenna,jobs,ml_results} to preserve a default while allowing
env_file or external overrides; update the environment entry referencing
CELERY_QUEUES accordingly and ensure host .env files contain any host-specific
defaults.
In `@docs/claude/planning/celery-queue-split-rollout.md`:
- Around line 80-85: Update the production validation described in the rollout
doc to relax the strict consumer-count check: change the "production points to
the demo check" guidance to either validate the expected topology (recognizing
worker-only spillover hosts that may consume all three queues and the presence
of ami-live adding dedicated services celeryworker_jobs and celeryworker_ml) or
at minimum assert there is at least one consumer per queue instead of enforcing
a specific count; ensure the wording distinguishes demo vs production
expectations and mentions the spillover hosts and dedicated services by name.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: bca26de9-1608-4a29-9c4c-def26d972c1a
📒 Files selected for processing (7)
compose/local/django/celery/worker/startcompose/production/django/celery/worker/startconfig/settings/base.pydocker-compose.production.ymldocker-compose.staging.ymldocker-compose.worker.ymldocs/claude/planning/celery-queue-split-rollout.md
This task is emitted from save_results (pipeline.py:990, one delay per batch of source images) and does heavy image cropping + S3 writes. Left unrouted, it defaults to the antenna queue — the opposite of what the queue-split is trying to achieve, since a single large job's cropping fan-out can then starve beat/housekeeping. Co-Authored-By: Claude <noreply@anthropic.com>
Production topology previously put all three worker services (antenna, jobs, ml_results) on ami-live, which meant the bursty ML pool was competing with Django/beat/flower for CPU and RAM. With CELERY_WORKER_CONCURRENCY=16 inherited per service, that's 48 prefork processes before any dedicated worker VM spins up. Now: - docker-compose.production.yml runs only the antenna worker (alongside Django + beat + flower on the app host). - docker-compose.worker.yml runs three dedicated services (antenna / jobs / ml_results) per worker VM, so isolation holds there too — a burst on one class can't saturate a shared pool and starve another. Rollout doc updated to reflect the new topology. Co-Authored-By: Claude <noreply@anthropic.com>
- Standardize rollout doc script name to reset_demo_to_branch.sh - Clarify settings comment: only staging/production/worker composes run per-queue dedicated workers; local/CI use a single all-queues worker Co-Authored-By: Claude <noreply@anthropic.com>
There was a problem hiding this comment.
🧹 Nitpick comments (1)
docker-compose.worker.yml (1)
38-63: Optional: DRY the three near-identical worker services via a YAML anchor.The three
celeryworker*services differ only inCELERY_QUEUES. Consider extracting a&celeryworker_baseanchor to reduce duplication and keep future changes (e.g.,command,restart,ports) in sync across all three.♻️ Suggested refactor
- celeryworker: - <<: *django + celeryworker: &celeryworker_base + <<: *django scale: 1 ports: [] command: /start-celeryworker environment: CELERY_QUEUES: "antenna" restart: always celeryworker_jobs: - <<: *django - scale: 1 - ports: [] - command: /start-celeryworker + <<: *celeryworker_base environment: CELERY_QUEUES: "jobs" - restart: always celeryworker_ml: - <<: *django - scale: 1 - ports: [] - command: /start-celeryworker + <<: *celeryworker_base environment: CELERY_QUEUES: "ml_results" - restart: always🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@docker-compose.worker.yml` around lines 38 - 63, The three services celeryworker, celeryworker_jobs, and celeryworker_ml are duplicated except for CELERY_QUEUES; introduce a YAML anchor (e.g., &celeryworker_base) containing the shared keys (<<: *django, scale, ports, command, restart, environment defaults if any) and then use the anchor with YAML merge or alias for each service, overriding only the environment CELERY_QUEUES values for "antenna", "jobs", and "ml_results" respectively so future common changes stay in one place.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@docker-compose.worker.yml`:
- Around line 38-63: The three services celeryworker, celeryworker_jobs, and
celeryworker_ml are duplicated except for CELERY_QUEUES; introduce a YAML anchor
(e.g., &celeryworker_base) containing the shared keys (<<: *django, scale,
ports, command, restart, environment defaults if any) and then use the anchor
with YAML merge or alias for each service, overriding only the environment
CELERY_QUEUES values for "antenna", "jobs", and "ml_results" respectively so
future common changes stay in one place.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: a78b0748-3a7b-460a-973e-384c13896a57
📒 Files selected for processing (4)
config/settings/base.pydocker-compose.production.ymldocker-compose.worker.ymldocs/claude/planning/celery-queue-split-rollout.md
✅ Files skipped from review due to trivial changes (2)
- docker-compose.production.yml
- docs/claude/planning/celery-queue-split-rollout.md
🚧 Files skipped from review as they are similar to previous changes (1)
- config/settings/base.py
The script is used on staging, demo, and single-box deploys — not demo-specific. Standardize both mentions to reset_to_branch.sh, which matches the actual filename on the hosts. Co-Authored-By: Claude <noreply@anthropic.com>
- Generalize ami-live / ami-worker-2 / ami-worker-3 hostnames — this doc lives in a public repo and shouldn't reference deployment-specific names - Drop stale commit SHA; branch name is sufficient after further commits - Clarify that the "scp three files" list is the demo-path subset, not the full changeset on the branch Co-Authored-By: Claude <noreply@anthropic.com>
|
Claude says: Final review pass summary (3 agents: code-review, silent-failure, comment-accuracy). Fixed this round
Commits: Validated as OK
Flagged as follow-ups (not blockers)
Happy to open tickets for (1) and (2) if they're not already tracked. |
|
Claude says: Answers: CI & local start script — why tests pass. Only the production start script ( You're right about the stale-PENDING reaper — I was wrong. How a consumer-count health check would work. Think of it as a sibling to
Where it reports to:
Tick rate: something like every 5 min — enough to catch a bad deploy well before I'll leave this as a follow-up (noted in the updated PR description under "Follow-ups (deferred)") — not a blocker for merge since |
Summary
Splits Celery tasks across three dedicated queues so one class of task cannot starve another:
antennajobsrun_jobml_resultsprocess_nats_pipeline_result,save_results,create_detection_imagesWorker start script is parameterized via
CELERY_QUEUES(default differs per script), so one image serves all three services.Topology
antennaworker, alongside Django + beat + flower. Heavy bursty work is kept off the app host so the ML pool doesn't compete with request handling for CPU/RAM.antenna,jobs,ml_results) viadocker-compose.worker.yml. Each queue gets its own container on each worker host so a burst on one class cannot saturate a shared pool and starve another.celeryworkerconsumes all three queues (CELERY_QUEUESdefault incompose/local/django/celery/worker/startisantenna,jobs,ml_results). No per-queue services needed — this is why existing tests continue to pass without compose changes for CI.Motivation
On a production async_api job of ~740 images,
run_jobinvocations for newly submitted jobs sat in PENDING for many minutes behind the flood ofprocess_nats_pipeline_resulttasks the first job was emitting. Withprefetch_multiplier=1andconcurrency=8, a single worker's slot pool was saturated by ML result processing. Splitting gives each class its own pool.Validation
Deployed and tested on both preview environments (single-host, three workers each).
Staging (integration branch — queue split + pgbouncer + main):
rabbitmqctl list_queuesshowsantenna,jobs,ml_resultswith 1 consumer each, idle at 0 between dispatchesapp.amqp.routerresolvesrun_job → jobs,process_nats_pipeline_result → ml_results,save_results → ml_results,create_detection_images → ml_results, default →antennatest_ml_job_e2easync_api job (20 images, panama pipeline): 24s end-to-end, all three stages SUCCESSantennawhile the job occupiedml_resultsDemo (queue split alone):
test_ml_job_e2easync_api job (10 images, quebec pipeline): 18s end-to-end, all three stages SUCCESSTest plan
Path to production
Deploy order matters so tasks don't pile up on an unread queue:
docker-compose.worker.ymlnow runs three services — deploying here first ensures every queue has a consumer before the app host stops serving the old ones.docker-compose.production.ymlnow runs only theantennaworker; the previous single-queue worker is reconfigured to theantennaqueue.Post-deploy verification:
rabbitmqctl list_queues name messages consumersshows three queues, each with at least 1 consumer (2 = one per worker host, forjobsandml_results)celeryworker_jobsandceleryworker_mlcontainers only on worker hosts, not on the app hostrun_jobstarts within seconds even during async_api result burstsThings that could go wrong
check_stale_jobs(beat task,STALLED_JOBS_MAX_MINUTEScutoff) will also catch a never-consumedrun_jobsincePENDINGis inJobState.running_states(), so a truly stuck job eventually gets revoked and surfaced in the UI.CELERY_TASK_ROUTESkey. Currently routed tasks are not in the beat schedule.CELERY_WORKER_CONCURRENCY). Watch RSS after deploy; lowerCELERY_WORKER_CONCURRENCYin the worker host.envif memory is tight.Follow-ups (deferred)
rabbitmqctl list_queues name consumers(or the management API) every N minutes and emits an error log when any ofantenna/jobs/ml_resultshas 0 consumers. Would shave the detection window for "worker host flapped" scenarios without waiting forcheck_stale_jobsto fire.