input: splice queued retries ahead of in-flight fresh batches#432
Conversation
When auto_replay_nacks is enabled, a nacked batch is supposed to be
re-delivered to the pipeline before any subsequent fresh batch. The
autoretry list's Shift already prioritises retries over fresh reads --
but AsyncReader's loop reads ahead: after pushing batch N to its
unbuffered transactions channel, it spawns the ack-handler goroutine
and immediately calls ReadBatch for batch N+1. By the time batch N
errors and the ack-handler queues the retry, ReadBatch has already
returned a fresh batch N+1 and pushed it onto the channel. The retry
ends up scheduled behind the read-ahead.
This breaks ordering for downstream consumers that rely on
auto_replay_nacks for at-least-once delivery -- a partial output
failure on a same-key partition surfaces as same-key reordering at the
destination, even though every layer "preserves order" in isolation.
Reproduced from connect#4387.
Fix: race the channel send against a retry-available signal. When the
signal wins, defer the in-flight fresh batch into a local queue and
loop. The next iteration prefers (in order):
1. A queued retry (via TryShiftRetry, non-blocking).
2. A previously deferred fresh batch.
3. A new fresh read.
Plumbed through:
- autoretry.List grows a buffered (size 1, coalescing) retryNotify
channel and a TryShiftRetry method. wrapPendingAck non-blocking
sends on retryNotify whenever a retry is queued.
- autoRetryInputBatched exposes both via unexported methods.
- airGapBatchReader forwards them, gated on the underlying
BatchInput satisfying the unexported retryAwareBatchInput
interface -- so third-party BatchInput implementations are
unaffected.
- AsyncReader.loop type-asserts r.reader to an unexported retryAware
interface (Go structural typing means no internal/public package
coupling). Readers that don't implement it get the previous
behaviour exactly.
Tests:
- TestRetryListNotifyAndTryShift pins the autoretry-level contract:
notify fires on nack, doesn't fire on success, coalesces;
TryShiftRetry returns false without dispatching a fresh read when
the queue is empty.
- TestBatchAutoRetryReadAheadOrdering is the integration regression:
on the unfixed code it observes [A, B, A_retry] (read-ahead bug);
with the fix it observes [A, A_retry, B].
Refs redpanda-data/connect#4387.
|
Thank you for the thorough write-up, Travis — the read-ahead diagnosis is precise and the mechanism is well documented. I'd like to push back on the framing before we go further, as I think the underlying ordering problem survives this change. The fix only closes a window that is rarely open when a nack arrives. The splice operates at the input → consumer boundary, preempting a fresh batch that has been read but not yet handed across the (unbuffered) That makes the change dominated in every topology. Where pipelining is present, it does not fire. Where the consumer is sequential enough for it to fire, More fundamentally, For the scenario behind redpanda-data/connect#4387, I'd suggest the correct layer is the output: I'd be glad to be shown a concrete topology where the second batch is still parked at the input boundary at nack time — that would change my reading. Absent that, my inclination is to steer the original issue toward an output-level solution rather than merge this. |
Summary
When
auto_replay_nacks: trueis set, a nacked batch is supposed to be re-delivered to the pipeline before any subsequent fresh batch.autoretry.List.Shiftprioritises retries over fresh reads — butAsyncReader.loopreads ahead: after pushing batch N to its unbuffered transactions channel it spawns the ack-handler goroutine and immediately callsReadBatchfor batch N+1. By the time batch N errors and the ack-handler queues the retry, the nextReadBatchhas already returned a fresh batch and pushed it onto the channel — so the retry ends up scheduled behind the read-ahead.This breaks ordering for downstream consumers that rely on
auto_replay_nacksfor at-least-once delivery. With same-key records concentrated on a single source/destination partition (e.g. a Redpanda Connect kafka→kafka migrator), a partial output failure surfaces as same-key reordering at the destination, even though every layer "preserves order" in isolation. Reproduced from redpanda-data/connect#4387.This PR fixes the race by splicing queued retries ahead of any fresh batch the
AsyncReaderhas read but not yet handed off downstream.Approach
autoretry.Listgrows a buffered (size-1, coalescing)retryNotifychannel and aTryShiftRetrymethod:wrapPendingAckdoes a non-blocking send onretryNotifywhenever a retry is queued.TryShiftRetryreturns a queued retry without blocking and without dispatching a fresh read.autoRetryInputBatchedexposes both via unexported methods.airGapBatchReaderforwards them, gated on the underlyingBatchInputsatisfying an unexportedretryAwareBatchInputinterface — so third-partyBatchInputimplementations are unaffected.AsyncReader.looptype-assertsr.readerto an unexportedretryAwareinterface (Go's structural typing means no internal/public package coupling). Readers that don't implement it get the previous behaviour exactly. When supported, each iteration prefers, in order:TryShiftRetry, non-blocking).The channel send for the chosen batch races against
retryNotifyin aselect— when the signal wins, the in-flight fresh batch is moved to a localdeferredqueue and the loop goes again.Test plan
go test -count=10 -race ./internal/autoretry/ ./public/service/ ./internal/component/input/...TestRetryListNotifyAndTryShiftininternal/autoretry/auto_retry_list_test.gopins the autoretry-level contract:retryNotifyfires on a nack, doesn't fire on a successful ack, coalesces;TryShiftRetryreturns false without dispatching a fresh read when the queue is empty.TestBatchAutoRetryReadAheadOrderinginpublic/service/input_auto_retry_batched_readahead_test.gois the integration regression: on the unfixed code it observes[A, B, A_retry](read-ahead bug); with the fix it observes[A, A_retry, B].Backward compatibility
BatchInputimplementations that don't implementretryAwareBatchInput(i.e. anything not wrapped byAutoRetryNacksBatched) are routed through the original code paths verbatim —retryNotifyisnilso that select case never fires,deferredstays empty, and the loop reduces to its previous shape../public/service/,./internal/autoretry/, and./internal/component/input/...all still pass under-race.pendingAcks.Add(1)is now incremented after the push succeeds rather than before, fixing a miscount that could have held up shutdown if the push racedshutSig. The shadowed-errcapture in the ack-handler goroutine is changed to a local:=to avoid a pre-existing data race across iterations.Notes for reviewers
internal/component/input/interface.gochange I'd ordinarily prefer (a publicRetryAwarecompanion toAsync) was scoped down to an unexported interface inasync_reader.goto keep the change isolated; happy to promote it to a public interface in a follow-up if you'd rather have it documented as part of theAsyncsurface.retryNotifyChan/tryShiftRetrymethods onautoRetryInputBatchedare intentionally unexported and matched structurally — third-partyBatchInputimplementations cannot accidentally opt in or break the assertion.