-
-
Notifications
You must be signed in to change notification settings - Fork 1
Add is_shutting_down flag for arroyo rebalance case #512
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
|
||
| # Check if killed during rebalance | ||
| pid = process.pid | ||
| if pid is not None: | ||
| with registry_lock: | ||
| was_killed_by_rebalance = pid in factory._killed_during_rebalance | ||
| if was_killed_by_rebalance: | ||
| factory._killed_during_rebalance.discard(pid) | ||
|
|
||
| if was_killed_by_rebalance: | ||
| # Wait for kill to complete, then don't commit offset | ||
| process.join(timeout=10) # Give kill_active_processes time to finish | ||
| logger.warning( | ||
| "Process killed during rebalance, message will be reprocessed", | ||
| extra={"artifact_id": artifact_id}, | ||
| ) | ||
| raise TimeoutError("Subprocess killed during rebalance") | ||
| if factory._is_shutting_down: | ||
| logger.warning( | ||
| "Process killed during rebalance, message will be reprocessed", | ||
| extra={"artifact_id": artifact_id}, | ||
| ) | ||
| raise TimeoutError("Subprocess killed during rebalance") | ||
|
|
||
| # Handle timeout (process still alive after full timeout) | ||
| if process.is_alive(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Messages completing during rebalance are incorrectly marked as killed due to _is_shutting_down check, causing reprocessing.
Severity: CRITICAL | Confidence: High
🔍 Detailed Analysis
The code incorrectly raises a TimeoutError when a subprocess completes successfully (or naturally times out) during a Kafka rebalance, due to checking the global _is_shutting_down flag. This conflates processes actively killed by kill_active_processes() with those that finish while shutdown is merely in progress. As a result, messages that have already been processed successfully are marked as 'killed during rebalance' and reprocessed, violating exactly-once processing semantics and potentially leading to duplicate message processing.
💡 Suggested Fix
Reintroduce a mechanism, such as PID-based tracking, to explicitly differentiate between subprocesses that are actively killed during rebalance and those that complete naturally while _is_shutting_down is true. Only raise the 'killed during rebalance' TimeoutError if the process was indeed terminated by the shutdown sequence.
🤖 Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.
Location: src/launchpad/kafka.py#L104-L115
Potential issue: The code incorrectly raises a `TimeoutError` when a subprocess
completes successfully (or naturally times out) during a Kafka rebalance, due to
checking the global `_is_shutting_down` flag. This conflates processes actively killed
by `kill_active_processes()` with those that finish while shutdown is merely in
progress. As a result, messages that have already been processed successfully are marked
as 'killed during rebalance' and reprocessed, violating exactly-once processing
semantics and potentially leading to duplicate message processing.
Did we get this right? 👍 / 👎 to inform future reviews.
Reference ID: 5866656
| "Process killed during rebalance, message will be reprocessed", | ||
| extra={"artifact_id": artifact_id}, | ||
| ) | ||
| raise TimeoutError("Subprocess killed during rebalance") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can just use the flag instead of checking the pid set now
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #512 +/- ##
==========================================
+ Coverage 80.91% 80.93% +0.02%
==========================================
Files 164 164
Lines 14213 14209 -4
Branches 1501 1500 -1
==========================================
Hits 11500 11500
+ Misses 2144 2140 -4
Partials 569 569 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
Going with #513 instead for now, but keeping this PR in our back pocket for the reasons cited in 513's summary |
After merging #504 we noticed that there was still cases of the poll interval timeout happening. Upon further investigation, we realized this is happening:
To validate this, I set
KAFKA_MAX_PENDING_FUTURESto 0 and that killed the topic lag quickly on US region yesterday. This works because it means that arroyo'srun_task_in_threadswont have any future tasks pre-fetched. Instead of doing that though, we can have a flag set when shut down is happening and then just skip any future tasks that attempt to get processed during the shutdown rebalance.This way we can keep
KAFKA_MAX_PENDING_FUTURESnon zero and still have the benefit of prefetching tasks (and potentially still our consumer with a concurrency > 1 if we needed to in the future