Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 16 additions & 20 deletions src/launchpad/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,14 @@ def _process_in_subprocess(decoded_message: Any, log_queue: multiprocessing.Queu
def _kill_process(process: multiprocessing.Process, artifact_id: str) -> None:
"""Gracefully terminate, then force kill a subprocess."""
process.terminate()
process.join(timeout=5)
process.join(timeout=1)
if process.is_alive():
logger.warning(
"Process did not terminate gracefully, force killing",
extra={"artifact_id": artifact_id, "pid": process.pid},
)
process.kill()
process.join(timeout=1) # Brief timeout to reap zombie, avoid infinite block
process.join(timeout=0.5)
if process.is_alive():
logger.error(
"Process could not be killed, may become zombie",
Expand All @@ -79,6 +79,9 @@ def process_kafka_message_with_service(
factory: LaunchpadStrategyFactory,
) -> Any:
"""Process a Kafka message by spawning a fresh subprocess with timeout protection."""
if factory._is_shutting_down:
raise TimeoutError("Skipping message processing - shutdown in progress")

timeout = int(os.getenv("KAFKA_TASK_TIMEOUT_SECONDS", "720")) # 12 minutes default

try:
Expand All @@ -101,21 +104,12 @@ def process_kafka_message_with_service(
process.join(timeout=timeout)

# Check if killed during rebalance
pid = process.pid
if pid is not None:
with registry_lock:
was_killed_by_rebalance = pid in factory._killed_during_rebalance
if was_killed_by_rebalance:
factory._killed_during_rebalance.discard(pid)

if was_killed_by_rebalance:
# Wait for kill to complete, then don't commit offset
process.join(timeout=10) # Give kill_active_processes time to finish
logger.warning(
"Process killed during rebalance, message will be reprocessed",
extra={"artifact_id": artifact_id},
)
raise TimeoutError("Subprocess killed during rebalance")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can just use the flag instead of checking the pid set now

if factory._is_shutting_down:
logger.warning(
"Process killed during rebalance, message will be reprocessed",
extra={"artifact_id": artifact_id},
)
raise TimeoutError("Subprocess killed during rebalance")

# Handle timeout (process still alive after full timeout)
if process.is_alive():
Comment on lines 105 to 115
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Messages completing during rebalance are incorrectly marked as killed due to _is_shutting_down check, causing reprocessing.
Severity: CRITICAL | Confidence: High

🔍 Detailed Analysis

The code incorrectly raises a TimeoutError when a subprocess completes successfully (or naturally times out) during a Kafka rebalance, due to checking the global _is_shutting_down flag. This conflates processes actively killed by kill_active_processes() with those that finish while shutdown is merely in progress. As a result, messages that have already been processed successfully are marked as 'killed during rebalance' and reprocessed, violating exactly-once processing semantics and potentially leading to duplicate message processing.

💡 Suggested Fix

Reintroduce a mechanism, such as PID-based tracking, to explicitly differentiate between subprocesses that are actively killed during rebalance and those that complete naturally while _is_shutting_down is true. Only raise the 'killed during rebalance' TimeoutError if the process was indeed terminated by the shutdown sequence.

🤖 Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.

Location: src/launchpad/kafka.py#L104-L115

Potential issue: The code incorrectly raises a `TimeoutError` when a subprocess
completes successfully (or naturally times out) during a Kafka rebalance, due to
checking the global `_is_shutting_down` flag. This conflates processes actively killed
by `kill_active_processes()` with those that finish while shutdown is merely in
progress. As a result, messages that have already been processed successfully are marked
as 'killed during rebalance' and reprocessed, violating exactly-once processing
semantics and potentially leading to duplicate message processing.

Did we get this right? 👍 / 👎 to inform future reviews.
Reference ID: 5866656

Expand Down Expand Up @@ -201,7 +195,7 @@ def poll(self) -> None:
self._inner.poll()

def close(self) -> None:
# Kill all active subprocesses BEFORE closing inner strategy
self._factory._is_shutting_down = True
self._factory.kill_active_processes()
self._inner.close()

Expand Down Expand Up @@ -264,7 +258,7 @@ def __init__(

self._active_processes: dict[int, tuple[multiprocessing.Process, str]] = {}
self._processes_lock = threading.Lock()
self._killed_during_rebalance: set[int] = set()
self._is_shutting_down = False

self.concurrency = concurrency
self.max_pending_futures = max_pending_futures
Expand All @@ -286,7 +280,6 @@ def kill_active_processes(self) -> None:
)
for pid, (process, artifact_id) in list(self._active_processes.items()):
if process.is_alive():
self._killed_during_rebalance.add(pid)
logger.info("Terminating subprocess with PID %d", pid)
_kill_process(process, artifact_id)
self._active_processes.clear()
Expand All @@ -297,6 +290,9 @@ def create_with_partitions(
partitions: Mapping[Partition, int],
) -> ProcessingStrategy[KafkaPayload]:
"""Create the processing strategy chain."""
# Reset shutdown flag when creating new strategy after rebalance
self._is_shutting_down = False

next_step: ProcessingStrategy[Any] = CommitOffsets(commit)

processing_function = partial(
Expand Down
Loading