fix(kafka): eliminate Resume timeout race in batch consumer pause/heartbeat#1313
fix(kafka): eliminate Resume timeout race in batch consumer pause/heartbeat#1313absorbb wants to merge 2 commits into
Conversation
…rtbeat resume() and the pause-heartbeat goroutine coordinate via an unbuffered resumeChannel, so a resume signal can only be delivered while the heartbeat is parked in its select. The heartbeat loop runs restartConsumer synchronously on non-retriable ReadMessage errors; restartConsumer blocks for KafkaSessionTimeoutMs+15s per init attempt (default 60s baseline) and loops if init keeps failing. With unhealthy brokers this stalls the heartbeat past KafkaMaxPollIntervalMs (5 min), triggering `failed to resume kafka consumer: Resume timeout` in resume() and the analogous failure in _unpause(). Two defense-in-depth fixes: - Buffer resumeChannel to size 1. resume() can deposit the signal even while the heartbeat is mid-iteration; the heartbeat picks it up on its next pass. pause() drains any stale signal on entry so a new cycle doesn't inherit one. The signal handler also re-resumes the current consumer in case it was replaced between resume()'s call and the heartbeat picking up the signal. - Run restartConsumer asynchronously when invoked from the heartbeat via new restartConsumerAsync, guarded by an atomic restarting flag to prevent overlapping restart attempts. The synchronous entrypoint is preserved for other call sites that must block until the new consumer is up. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
| //buffered size 1: resume() can deposit the signal even if the | ||
| //heartbeat goroutine is mid-iteration (e.g. blocked in | ||
| //restartConsumer); the heartbeat picks it up on the next pass. | ||
| resumeChannel: make(chan struct{}, 1), |
There was a problem hiding this comment.
Making resumeChannel buffered removes the rendezvous semantics that _unpause() currently relies on in pauseOrSuspend() (_unpause(); consumer.Close(); bc.consumer.Store(nil)). With an unbuffered channel, _unpause() blocks until the pause heartbeat loop receives the signal; now it can return immediately while the loop is still inside ReadMessage(). Closing the consumer in that window can produce a non-timeout error in the pause loop and trigger restartConsumerAsync(), which can recreate a consumer right after we intended to suspend it. This looks like a user-visible regression (suspend may turn into restart churn).
Can we keep non-blocking resume() behavior without weakening _unpause() synchronization (e.g. separate channels/paths for resume vs suspend-ack, or an explicit ack/wait primitive for suspend)?
There was a problem hiding this comment.
Good catch — fixed in b45169a with the separate-channel approach you suggested.
resume() keeps the buffered resumeChannel (non-blocking deposit, which is what fixed the Resume timeout). The suspend path now uses a dedicated unbuffered stopChannel: _unpause() sends on it, so the send only completes once the heartbeat has received it and is leaving its loop — i.e. no longer inside ReadMessage(). That restores the rendezvous pauseOrSuspend relies on, so consumer.Close() no longer races an active ReadMessage and can't trip restartConsumerAsync() into recreating the consumer we just suspended.
The heartbeat's stopChannel case only sets paused=false and breaks — no defensive Resume, since the caller is about to close the consumer anyway.
Buffering resumeChannel fixed the Resume timeout but weakened _unpause(): the buffered send returned immediately, so pauseOrSuspend could Close the consumer while the heartbeat was still in ReadMessage. The resulting non-retriable error tripped restartConsumerAsync and recreated a consumer we intended to suspend. Add a dedicated unbuffered stopChannel for the suspend path. _unpause() now sends on it, so the send only completes once the heartbeat has left its loop (no longer in ReadMessage), restoring the rendezvous Close relies on. resume() keeps using the buffered resumeChannel, preserving the non-blocking resume that fixed the timeout. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Reviewed the changes in bulker/bulkerapp/app/abstract_batch_consumer.go, focusing on pause/resume/suspend and restart interactions.
The separate suspend stop-signal path and async restart guard address the previously reported race/timeout behavior, and I did not find additional actionable correctness or security issues in this patch.
Summary
Two defense-in-depth fixes for the
failed to resume kafka consumer: Resume timeouterror seen on the retry consumer paths in bulkerapp (e.g.in.id.<workspace>.m.retry.t._all_).resumeChannelto size 1 soresume()(and_unpause()) can deposit the signal even when the pause heartbeat goroutine is mid-iteration.pause()drains any stale signal on entry, and the signal handler defensively re-resumes the current consumer in case it was replaced betweenresume()'sconsumer.Resume(...)call and the heartbeat actually picking up the signal.restartConsumerasynchronously when invoked from the heartbeat via a newrestartConsumerAsynchelper, guarded by anatomic.Boolto prevent overlapping restart attempts. The synchronousrestartConsumerentrypoint is preserved for the other call sites (pauseKafkaConsumer, etc.) that need to block until the new consumer is up.Why this fixes the symptom
resume()and the pause-heartbeat goroutine coordinate viaresumeChannel. Before this PR the channel was unbuffered, so a send only succeeds when the heartbeat is parked in itsselect. The heartbeat loop callsrestartConsumersynchronously on non-retriableReadMessageerrors;restartConsumerblocks forKafkaSessionTimeoutMs + 15sper init attempt (default 60 s baseline) and loops if init keeps failing. With unhealthy brokers / network the heartbeat is starved pastKafkaMaxPollIntervalMs(default 5 min), andresume()times out.The same starvation also explains the sibling
failed to unpause kafka consumer.from_unpause().With the buffered channel,
resume()deposits the signal immediately, callsconsumer.Resume(partitions)on the current consumer, and returns. The heartbeat picks up the signal on its next pass (wheneverrestartConsumerfinishes, or on its next ticker tick). With the async restart, the heartbeat no longer parks itself inrestartConsumerat all — restart proceeds in the background, the heartbeat keeps cycling, and the new consumer is brought up paused byrebalanceCallback(which checksbc.paused) as before.Op notes
resume()is no longer reachable.restartConsumeritself is unchanged; only the call site in the heartbeat loop is rerouted throughrestartConsumerAsync. Other callers (e.g.pauseKafkaConsumerfailure path) still run it synchronously.restarting atomic.Boolfield is unexported and only consulted byrestartConsumerAsync; piling-up overlapping restarts is suppressed (subsequent async calls are no-ops while one is in flight).Test plan
go vet ./bulkerapp/...passes (verified locally)Resume timeoutand nofailed to unpausein logs after recoverym.retry.t._all_) resumes cleanly after a forced restartrestartConsumerin flight per consumer)🤖 Generated with Claude Code