Skip to content

feat: assign different categories of background tasks to separate queues#1257

Merged
mihow merged 8 commits intomainfrom
feat/celery-queue-split
Apr 20, 2026
Merged

feat: assign different categories of background tasks to separate queues#1257
mihow merged 8 commits intomainfrom
feat/celery-queue-split

Conversation

@mihow
Copy link
Copy Markdown
Collaborator

@mihow mihow commented Apr 20, 2026

Summary

Splits Celery tasks across three dedicated queues so one class of task cannot starve another:

Queue Tasks Rationale
antenna default — beat, cache refresh, sync, misc Fast, high-turnover housekeeping
jobs run_job Can hold a worker slot for hours
ml_results process_nats_pipeline_result, save_results, create_detection_images Bursty, ~180 tasks / 5 min per active async_api job

Worker start script is parameterized via CELERY_QUEUES (default differs per script), so one image serves all three services.

Topology

  • App host (production): runs only the antenna worker, alongside Django + beat + flower. Heavy bursty work is kept off the app host so the ML pool doesn't compete with request handling for CPU/RAM.
  • Dedicated worker hosts (production): run all three services (antenna, jobs, ml_results) via docker-compose.worker.yml. Each queue gets its own container on each worker host so a burst on one class cannot saturate a shared pool and starve another.
  • Staging: single-host deploy runs all three worker services on one box.
  • Local dev / CI: a single celeryworker consumes all three queues (CELERY_QUEUES default in compose/local/django/celery/worker/start is antenna,jobs,ml_results). No per-queue services needed — this is why existing tests continue to pass without compose changes for CI.

Motivation

On a production async_api job of ~740 images, run_job invocations for newly submitted jobs sat in PENDING for many minutes behind the flood of process_nats_pipeline_result tasks the first job was emitting. With prefetch_multiplier=1 and concurrency=8, a single worker's slot pool was saturated by ML result processing. Splitting gives each class its own pool.

Validation

Deployed and tested on both preview environments (single-host, three workers each).

Staging (integration branch — queue split + pgbouncer + main):

  • rabbitmqctl list_queues shows antenna, jobs, ml_results with 1 consumer each, idle at 0 between dispatches
  • app.amqp.router resolves run_job → jobs, process_nats_pipeline_result → ml_results, save_results → ml_results, create_detection_images → ml_results, default → antenna
  • test_ml_job_e2e async_api job (20 images, panama pipeline): 24s end-to-end, all three stages SUCCESS
  • Beat tasks continued flowing on antenna while the job occupied ml_results

Demo (queue split alone):

  • Same queue / consumer topology check passed
  • test_ml_job_e2e async_api job (10 images, quebec pipeline): 18s end-to-end, all three stages SUCCESS
  • Larger load test with multi-hundred-image async_api job also ran clean

Test plan

  • Local single-worker validation (all three queues consumed by one container)
  • Staging deploy — three workers, three queues, one consumer each; e2e async_api job SUCCESS
  • Demo deploy — same verification, independent env
  • Demo load test with a larger job (to stress-test isolation, not just routing)
  • Code review (queue routing is a system-level change; at least one reviewer familiar with the Celery topology)
  • Production rollout (see below)

Path to production

Deploy order matters so tasks don't pile up on an unread queue:

  1. First: worker-only hosts. docker-compose.worker.yml now runs three services — deploying here first ensures every queue has a consumer before the app host stops serving the old ones.
  2. Second: app host. docker-compose.production.yml now runs only the antenna worker; the previous single-queue worker is reconfigured to the antenna queue.

Post-deploy verification:

  • rabbitmqctl list_queues name messages consumers shows three queues, each with at least 1 consumer (2 = one per worker host, for jobs and ml_results)
  • Flower shows celeryworker_jobs and celeryworker_ml containers only on worker hosts, not on the app host
  • A small run_job starts within seconds even during async_api result bursts

Things that could go wrong

  • A new queue has zero consumers — tasks pile up silently at the broker. Mitigated by the deploy order above and the "≥1 consumer each" post-deploy check. check_stale_jobs (beat task, STALLED_JOBS_MAX_MINUTES cutoff) will also catch a never-consumed run_job since PENDING is in JobState.running_states(), so a truly stuck job eventually gets revoked and surfaced in the UI.
  • Django publisher reads stale routes — the settings change is read at process start; django container must be recreated alongside the workers.
  • Beat scheduler uses wrong queue for a scheduled task — only an issue if a beat task name happens to match a CELERY_TASK_ROUTES key. Currently routed tasks are not in the beat schedule.
  • Worker VM memory pressure — splitting the previous single-container worker into three triples the baseline prefork process count on each VM (3 × CELERY_WORKER_CONCURRENCY). Watch RSS after deploy; lower CELERY_WORKER_CONCURRENCY in the worker host .env if memory is tight.

Follow-ups (deferred)

  • Active consumer-count monitor — a beat task that calls rabbitmqctl list_queues name consumers (or the management API) every N minutes and emits an error log when any of antenna/jobs/ml_results has 0 consumers. Would shave the detection window for "worker host flapped" scenarios without waiting for check_stale_jobs to fire.

mihow and others added 3 commits April 19, 2026 22:38
…tarvation

Previously all Celery tasks shared a single 'antenna' queue, so a burst of
high-volume tasks could block lower-volume ones on the same worker pool.

Observed scenario: ~740-image async_api job emitted ~180
process_nats_pipeline_result tasks per 5 min and starved run_job
invocations behind it, leaving newly submitted jobs stuck in PENDING for
many minutes. Long-running run_job tasks can similarly hold worker slots
and delay beat / housekeeping tasks.

Split into three queues, each with its own worker service:

  antenna     default — beat tasks, cache refresh, sync, housekeeping
  jobs        run_job (can hold a slot for hours)
  ml_results  process_nats_pipeline_result + save_results bursts

Worker start script now takes CELERY_QUEUES as env var (default: antenna)
so one image serves all three services. Worker-only hosts (ami-worker-2,
ami-worker-3) consume all three queues as spillover capacity via
docker-compose.worker.yml.

Relates to #1256 (job logging bottleneck) and #1026 (concurrent job log
updates) — those two tackle the write-path; this change tackles the
dispatch-path.
Match the production start script: read the queue list from
$CELERY_QUEUES, defaulting to all three queues (antenna, jobs,
ml_results) so the single local worker keeps consuming everything
by default. Lets devs override for isolation testing if they want.

Co-Authored-By: Claude <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings April 20, 2026 06:41
@netlify
Copy link
Copy Markdown

netlify Bot commented Apr 20, 2026

Deploy Preview for antenna-ssec canceled.

Name Link
🔨 Latest commit 1edb468
🔍 Latest deploy log https://app.netlify.com/projects/antenna-ssec/deploys/69e6a5d2ccc8ec0009dd5891

@netlify
Copy link
Copy Markdown

netlify Bot commented Apr 20, 2026

Deploy Preview for antenna-preview canceled.

Name Link
🔨 Latest commit 1edb468
🔍 Latest deploy log https://app.netlify.com/projects/antenna-preview/deploys/69e6a5d2f841610008dbb894

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Apr 20, 2026

📝 Walkthrough

Walkthrough

Worker startup scripts and Docker Compose were changed to select Celery queues from a CELERY_QUEUES environment variable; Celery task routing was added to send specific tasks to jobs and ml_results queues; compose files add per-queue worker services and production/staging defaults; rollout docs added.

Changes

Cohort / File(s) Summary
Worker Startup Scripts
compose/local/django/celery/worker/start, compose/production/django/celery/worker/start
Read CELERY_QUEUES (with defaults) and pass --queues="$QUEUES" to Celery/watchfiles instead of hard-coded --queues=antenna, allowing env-driven queue selection.
Celery Configuration
config/settings/base.py
Added CELERY_TASK_ROUTES mapping tasks to queues: ami.jobs.tasks.run_jobjobs; ami.jobs.tasks.process_nats_pipeline_result, ami.ml.models.pipeline.save_results, ami.ml.tasks.create_detection_imagesml_results.
Docker Compose Services
docker-compose.production.yml, docker-compose.staging.yml, docker-compose.worker.yml
Set CELERY_QUEUES env for existing celeryworker to "antenna"; added celeryworker_jobs (CELERY_QUEUES="jobs") and celeryworker_ml (CELERY_QUEUES="ml_results") services in staging/worker compose files; ensured restart: always and scale: 1 where applicable.
Documentation
docs/claude/planning/celery-queue-split-rollout.md
New rollout plan detailing queue-splitting architecture, deployment/demo steps, pre/post checks, rollback, failure modes, and production rollout order.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~22 minutes

Possibly related issues

Possibly related PRs

Suggested reviewers

  • carlosgjs

Poem

🐰 Hopped in with a cheerful squeak,
Three queues planted by my cheek.
Jobs and ML scurry on their way,
Antenna hums to save the day—
A rabbit's nod to queues at play. 🥕

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly and concisely summarizes the main change: assigning different task categories to separate Celery queues to prevent starvation.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.
Description check ✅ Passed The pull request description is comprehensive and well-structured. It includes a clear summary with a detailed breakdown of the queue topology, comprehensive motivation explaining the production issue, extensive validation across multiple environments (staging and demo), detailed test plan with checkmarks, explicit rollout instructions, and risk mitigation strategies for potential failure modes.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/celery-queue-split

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a Celery queue split to isolate long-running job execution and bursty ML-result processing from default “housekeeping” tasks, reducing cross-task starvation in RabbitMQ/Celery deployments.

Changes:

  • Add CELERY_TASK_ROUTES to route run_jobjobs and ML-result tasks → ml_results (default queue remains antenna).
  • Parameterize Celery worker startup to consume queues from CELERY_QUEUES, and update staging/production/worker-host compose stacks to run dedicated workers per queue.
  • Add an internal rollout/verification plan document for demo → production deployment sequencing.

Reviewed changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
docs/claude/planning/celery-queue-split-rollout.md Adds rollout + verification + rollback procedure for the queue split.
docker-compose.worker.yml Configures worker-only hosts to consume all three queues by default via CELERY_QUEUES.
docker-compose.staging.yml Splits staging workers into three dedicated services (antenna, jobs, ml_results).
docker-compose.production.yml Splits production workers into three dedicated services (antenna, jobs, ml_results).
config/settings/base.py Sets default queue to antenna and adds task routing rules for the queue split.
compose/production/django/celery/worker/start Reads CELERY_QUEUES to control which queues a worker consumes in prod images.
compose/local/django/celery/worker/start Updates local worker startup to consume multiple queues by default (but currently has a quoting bug).

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread docs/claude/planning/celery-queue-split-rollout.md
Comment thread docs/claude/planning/celery-queue-split-rollout.md Outdated
Comment thread compose/local/django/celery/worker/start
Comment thread config/settings/base.py Outdated
Comment thread config/settings/base.py
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@docker-compose.worker.yml`:
- Around line 29-33: The docker-compose.worker.yml currently hard-codes
CELERY_QUEUES in the environment block which overrides values from the service
env_file; remove the hard-coded value so the service picks up CELERY_QUEUES from
each host’s .env, or replace the hard-code with a shell-style fallback like
${CELERY_QUEUES:-antenna,jobs,ml_results} to preserve a default while allowing
env_file or external overrides; update the environment entry referencing
CELERY_QUEUES accordingly and ensure host .env files contain any host-specific
defaults.

In `@docs/claude/planning/celery-queue-split-rollout.md`:
- Around line 80-85: Update the production validation described in the rollout
doc to relax the strict consumer-count check: change the "production points to
the demo check" guidance to either validate the expected topology (recognizing
worker-only spillover hosts that may consume all three queues and the presence
of ami-live adding dedicated services celeryworker_jobs and celeryworker_ml) or
at minimum assert there is at least one consumer per queue instead of enforcing
a specific count; ensure the wording distinguishes demo vs production
expectations and mentions the spillover hosts and dedicated services by name.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: bca26de9-1608-4a29-9c4c-def26d972c1a

📥 Commits

Reviewing files that changed from the base of the PR and between c82fe02 and c246f90.

📒 Files selected for processing (7)
  • compose/local/django/celery/worker/start
  • compose/production/django/celery/worker/start
  • config/settings/base.py
  • docker-compose.production.yml
  • docker-compose.staging.yml
  • docker-compose.worker.yml
  • docs/claude/planning/celery-queue-split-rollout.md

Comment thread docker-compose.worker.yml Outdated
Comment thread docs/claude/planning/celery-queue-split-rollout.md Outdated
@mihow mihow changed the title feat(celery): split tasks across three queues to prevent cross-task starvation feat: assign different categories of background tasks to separate queues Apr 20, 2026
mihow and others added 3 commits April 19, 2026 23:52
This task is emitted from save_results (pipeline.py:990, one delay per
batch of source images) and does heavy image cropping + S3 writes. Left
unrouted, it defaults to the antenna queue — the opposite of what the
queue-split is trying to achieve, since a single large job's cropping
fan-out can then starve beat/housekeeping.

Co-Authored-By: Claude <noreply@anthropic.com>
Production topology previously put all three worker services (antenna,
jobs, ml_results) on ami-live, which meant the bursty ML pool was
competing with Django/beat/flower for CPU and RAM. With
CELERY_WORKER_CONCURRENCY=16 inherited per service, that's 48 prefork
processes before any dedicated worker VM spins up.

Now:
- docker-compose.production.yml runs only the antenna worker (alongside
  Django + beat + flower on the app host).
- docker-compose.worker.yml runs three dedicated services (antenna /
  jobs / ml_results) per worker VM, so isolation holds there too — a
  burst on one class can't saturate a shared pool and starve another.

Rollout doc updated to reflect the new topology.

Co-Authored-By: Claude <noreply@anthropic.com>
- Standardize rollout doc script name to reset_demo_to_branch.sh
- Clarify settings comment: only staging/production/worker composes run
  per-queue dedicated workers; local/CI use a single all-queues worker

Co-Authored-By: Claude <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
docker-compose.worker.yml (1)

38-63: Optional: DRY the three near-identical worker services via a YAML anchor.

The three celeryworker* services differ only in CELERY_QUEUES. Consider extracting a &celeryworker_base anchor to reduce duplication and keep future changes (e.g., command, restart, ports) in sync across all three.

♻️ Suggested refactor
-  celeryworker:
-    <<: *django
+  celeryworker: &celeryworker_base
+    <<: *django
     scale: 1
     ports: []
     command: /start-celeryworker
     environment:
       CELERY_QUEUES: "antenna"
     restart: always

   celeryworker_jobs:
-    <<: *django
-    scale: 1
-    ports: []
-    command: /start-celeryworker
+    <<: *celeryworker_base
     environment:
       CELERY_QUEUES: "jobs"
-    restart: always

   celeryworker_ml:
-    <<: *django
-    scale: 1
-    ports: []
-    command: /start-celeryworker
+    <<: *celeryworker_base
     environment:
       CELERY_QUEUES: "ml_results"
-    restart: always
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@docker-compose.worker.yml` around lines 38 - 63, The three services
celeryworker, celeryworker_jobs, and celeryworker_ml are duplicated except for
CELERY_QUEUES; introduce a YAML anchor (e.g., &celeryworker_base) containing the
shared keys (<<: *django, scale, ports, command, restart, environment defaults
if any) and then use the anchor with YAML merge or alias for each service,
overriding only the environment CELERY_QUEUES values for "antenna", "jobs", and
"ml_results" respectively so future common changes stay in one place.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@docker-compose.worker.yml`:
- Around line 38-63: The three services celeryworker, celeryworker_jobs, and
celeryworker_ml are duplicated except for CELERY_QUEUES; introduce a YAML anchor
(e.g., &celeryworker_base) containing the shared keys (<<: *django, scale,
ports, command, restart, environment defaults if any) and then use the anchor
with YAML merge or alias for each service, overriding only the environment
CELERY_QUEUES values for "antenna", "jobs", and "ml_results" respectively so
future common changes stay in one place.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: a78b0748-3a7b-460a-973e-384c13896a57

📥 Commits

Reviewing files that changed from the base of the PR and between c246f90 and 11ea628.

📒 Files selected for processing (4)
  • config/settings/base.py
  • docker-compose.production.yml
  • docker-compose.worker.yml
  • docs/claude/planning/celery-queue-split-rollout.md
✅ Files skipped from review due to trivial changes (2)
  • docker-compose.production.yml
  • docs/claude/planning/celery-queue-split-rollout.md
🚧 Files skipped from review as they are similar to previous changes (1)
  • config/settings/base.py

mihow and others added 2 commits April 20, 2026 15:09
The script is used on staging, demo, and single-box deploys — not
demo-specific. Standardize both mentions to reset_to_branch.sh, which
matches the actual filename on the hosts.

Co-Authored-By: Claude <noreply@anthropic.com>
- Generalize ami-live / ami-worker-2 / ami-worker-3 hostnames — this doc
  lives in a public repo and shouldn't reference deployment-specific names
- Drop stale commit SHA; branch name is sufficient after further commits
- Clarify that the "scp three files" list is the demo-path subset, not
  the full changeset on the branch

Co-Authored-By: Claude <noreply@anthropic.com>
@mihow
Copy link
Copy Markdown
Collaborator Author

mihow commented Apr 20, 2026

Claude says: Final review pass summary (3 agents: code-review, silent-failure, comment-accuracy).

Fixed this round

  • config/settings/base.py — clarified queue comment (local/CI use single worker, not per-queue)
  • docs/claude/planning/celery-queue-split-rollout.md — standardized script name to reset_to_branch.sh; scrubbed internal hostnames (ami-live, ami-worker-2/3) since this doc lives in a public repo; dropped stale commit SHA; clarified the scp file list is the demo-path subset

Commits: 11ea6289, d710f90e, 1edb4687.

Validated as OK

  • Routing coverage: all .delay()/.apply_async() sites either match a CELERY_TASK_ROUTES entry or fall through to the default antenna queue (consumed on every host). Transitive enqueues from save_results and process_nats_pipeline_result stay on ml_results.
  • Beat tasks (check_processing_services_online, jobs_health_check) correctly default to antenna.
  • Compose YAML anchors + environment/ports overrides are well-formed across staging / production / worker.
  • Shell quoting in the local worker start script is valid concatenation (same pattern as the pre-existing $MAX_TASKS_PER_CHILD handling).

Flagged as follow-ups (not blockers)

  1. No runtime guard for "queue has zero consumers" — if worker hosts are down, jobs/ml_results publishes pile up silently. Current mitigation is operator-dependent (deploy-order in the rollout doc). A beat-scheduled consumer-count health check on top of antenna would close this gap.
  2. run_job has no retry + no stale-PENDING reaper — if jobs queue has no consumer at submit time, the Job row sits PENDING indefinitely. Pre-existing; the queue split makes it slightly more likely by moving the consumer to an external host. Worth a check_stale_jobs-style reaper as a follow-up.

Happy to open tickets for (1) and (2) if they're not already tracked.

@mihow
Copy link
Copy Markdown
Collaborator Author

mihow commented Apr 20, 2026

Claude says: Answers:

CI & local start script — why tests pass. docker-compose.ci.yml uses the YAML merge anchor (<<: *django), so its celeryworker inherits the local Dockerfile + the same /start-celeryworker command. That hits compose/local/django/celery/worker/start, which defaults CELERY_QUEUES="${CELERY_QUEUES:-antenna,jobs,ml_results}". So CI's single worker consumes all three queues — no split, no risk of an unrouted task, tests continue to pass without compose changes. Same behavior in local dev: docker compose up gives one celeryworker container that picks up everything.

Only the production start script (compose/production/django/celery/worker/start) defaults to CELERY_QUEUES=antenna. Staging/production/worker composes each set it explicitly per service.

You're right about the stale-PENDING reaper — I was wrong. check_stale_jobs (ami/jobs/tasks.py:773) filters on status__in=JobState.running_states(), which includes PENDING. So a run_job task that was published to a queue with no consumer would still have a stale updated_at (no progress ticks), get caught by check_stale_jobs on the next jobs_health_check beat tick, checked against Celery state, and revoked with a user-visible log. Not a silent failure. I've edited the PR description to reflect this.

How a consumer-count health check would work. Think of it as a sibling to check_processing_services_online — a beat-scheduled Celery task (on antenna, so it always runs) that:

  1. Queries RabbitMQ via the management HTTP API (GET /api/queues/<vhost>/<queue>) — simpler than shelling out to rabbitmqctl from inside a container, and the credentials already exist in .envs/.production/.
  2. For each of antenna / jobs / ml_results, reads the consumers field.
  3. If any queue has 0 consumers, logger.error(...) with a stable error code (e.g. CELERY_QUEUE_NO_CONSUMER: jobs).

Where it reports to:

  • The error log lands in the normal Django/Celery log stream → New Relic picks it up (prod already has the NR agent in the worker images).
  • An NR alert on the error code fires a notification — same mechanism as other platform alerts.
  • Optionally: surface in the admin "System health" page alongside check_processing_services_online results so it's visible without opening NR.

Tick rate: something like every 5 min — enough to catch a bad deploy well before STALLED_JOBS_MAX_MINUTES kicks in, cheap on the broker.

I'll leave this as a follow-up (noted in the updated PR description under "Follow-ups (deferred)") — not a blocker for merge since check_stale_jobs is the safety net.

@mihow mihow merged commit 7f35bc5 into main Apr 20, 2026
7 checks passed
@mihow mihow deleted the feat/celery-queue-split branch April 20, 2026 22:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants