Skip to content

Conversation

@NicoHinderling
Copy link
Contributor

After merging #504 we noticed that there was still cases of the poll interval timeout happening. Upon further investigation, we realized this is happening:

Normal Processing:
  Message 1 → Worker spawns subprocess, waits
  Message 2 → Queued in ThreadPoolExecutor (concurrency=1, worker busy)

Rebalance:
  1. Kill Message 1's subprocess (~6s)
  2. Worker becomes free
  3. Executor auto-assigns Message 2 to freed worker
  4. NEW subprocess starts (11+ minutes)
  5. executor.shutdown() blocks waiting for Message 2
  6. Rebalance takes 11+ min → max.poll.interval.ms exceeded

To validate this, I set KAFKA_MAX_PENDING_FUTURES to 0 and that killed the topic lag quickly on US region yesterday. This works because it means that arroyo's run_task_in_threads wont have any future tasks pre-fetched. Instead of doing that though, we can have a flag set when shut down is happening and then just skip any future tasks that attempt to get processed during the shutdown rebalance.

This way we can keep KAFKA_MAX_PENDING_FUTURES non zero and still have the benefit of prefetching tasks (and potentially still our consumer with a concurrency > 1 if we needed to in the future

Comment on lines 105 to 115

# Check if killed during rebalance
pid = process.pid
if pid is not None:
with registry_lock:
was_killed_by_rebalance = pid in factory._killed_during_rebalance
if was_killed_by_rebalance:
factory._killed_during_rebalance.discard(pid)

if was_killed_by_rebalance:
# Wait for kill to complete, then don't commit offset
process.join(timeout=10) # Give kill_active_processes time to finish
logger.warning(
"Process killed during rebalance, message will be reprocessed",
extra={"artifact_id": artifact_id},
)
raise TimeoutError("Subprocess killed during rebalance")
if factory._is_shutting_down:
logger.warning(
"Process killed during rebalance, message will be reprocessed",
extra={"artifact_id": artifact_id},
)
raise TimeoutError("Subprocess killed during rebalance")

# Handle timeout (process still alive after full timeout)
if process.is_alive():
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Messages completing during rebalance are incorrectly marked as killed due to _is_shutting_down check, causing reprocessing.
Severity: CRITICAL | Confidence: High

🔍 Detailed Analysis

The code incorrectly raises a TimeoutError when a subprocess completes successfully (or naturally times out) during a Kafka rebalance, due to checking the global _is_shutting_down flag. This conflates processes actively killed by kill_active_processes() with those that finish while shutdown is merely in progress. As a result, messages that have already been processed successfully are marked as 'killed during rebalance' and reprocessed, violating exactly-once processing semantics and potentially leading to duplicate message processing.

💡 Suggested Fix

Reintroduce a mechanism, such as PID-based tracking, to explicitly differentiate between subprocesses that are actively killed during rebalance and those that complete naturally while _is_shutting_down is true. Only raise the 'killed during rebalance' TimeoutError if the process was indeed terminated by the shutdown sequence.

🤖 Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.

Location: src/launchpad/kafka.py#L104-L115

Potential issue: The code incorrectly raises a `TimeoutError` when a subprocess
completes successfully (or naturally times out) during a Kafka rebalance, due to
checking the global `_is_shutting_down` flag. This conflates processes actively killed
by `kill_active_processes()` with those that finish while shutdown is merely in
progress. As a result, messages that have already been processed successfully are marked
as 'killed during rebalance' and reprocessed, violating exactly-once processing
semantics and potentially leading to duplicate message processing.

Did we get this right? 👍 / 👎 to inform future reviews.
Reference ID: 5866656

"Process killed during rebalance, message will be reprocessed",
extra={"artifact_id": artifact_id},
)
raise TimeoutError("Subprocess killed during rebalance")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can just use the flag instead of checking the pid set now

@codecov
Copy link

codecov bot commented Dec 5, 2025

Codecov Report

❌ Patch coverage is 10.00000% with 9 lines in your changes missing coverage. Please review.
✅ Project coverage is 80.93%. Comparing base (1e64b6b) to head (9ffb606).
✅ All tests successful. No failed tests found.

Files with missing lines Patch % Lines
src/launchpad/kafka.py 10.00% 9 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #512      +/-   ##
==========================================
+ Coverage   80.91%   80.93%   +0.02%     
==========================================
  Files         164      164              
  Lines       14213    14209       -4     
  Branches     1501     1500       -1     
==========================================
  Hits        11500    11500              
+ Misses       2144     2140       -4     
  Partials      569      569              

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@NicoHinderling
Copy link
Contributor Author

Going with #513 instead for now, but keeping this PR in our back pocket for the reasons cited in 513's summary

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants