Skip to content

input: splice queued retries ahead of in-flight fresh batches#432

Open
twmb wants to merge 1 commit into
mainfrom
read-ahead-retry-splicing
Open

input: splice queued retries ahead of in-flight fresh batches#432
twmb wants to merge 1 commit into
mainfrom
read-ahead-retry-splicing

Conversation

@twmb

@twmb twmb commented May 8, 2026

Copy link
Copy Markdown
Contributor

Summary

When auto_replay_nacks: true is set, a nacked batch is supposed to be re-delivered to the pipeline before any subsequent fresh batch. autoretry.List.Shift prioritises retries over fresh reads — but AsyncReader.loop reads ahead: after pushing batch N to its unbuffered transactions channel it spawns the ack-handler goroutine and immediately calls ReadBatch for batch N+1. By the time batch N errors and the ack-handler queues the retry, the next ReadBatch has already returned a fresh batch and pushed it onto the channel — so the retry ends up scheduled behind the read-ahead.

This breaks ordering for downstream consumers that rely on auto_replay_nacks for at-least-once delivery. With same-key records concentrated on a single source/destination partition (e.g. a Redpanda Connect kafka→kafka migrator), a partial output failure surfaces as same-key reordering at the destination, even though every layer "preserves order" in isolation. Reproduced from redpanda-data/connect#4387.

This PR fixes the race by splicing queued retries ahead of any fresh batch the AsyncReader has read but not yet handed off downstream.

Approach

autoretry.List grows a buffered (size-1, coalescing) retryNotify channel and a TryShiftRetry method:

  • wrapPendingAck does a non-blocking send on retryNotify whenever a retry is queued.
  • TryShiftRetry returns a queued retry without blocking and without dispatching a fresh read.

autoRetryInputBatched exposes both via unexported methods. airGapBatchReader forwards them, gated on the underlying BatchInput satisfying an unexported retryAwareBatchInput interface — so third-party BatchInput implementations are unaffected.

AsyncReader.loop type-asserts r.reader to an unexported retryAware interface (Go's structural typing means no internal/public package coupling). Readers that don't implement it get the previous behaviour exactly. When supported, each iteration prefers, in order:

  1. A queued retry (via TryShiftRetry, non-blocking).
  2. A previously deferred fresh batch.
  3. A new fresh read.

The channel send for the chosen batch races against retryNotify in a select — when the signal wins, the in-flight fresh batch is moved to a local deferred queue and the loop goes again.

Test plan

  • go test -count=10 -race ./internal/autoretry/ ./public/service/ ./internal/component/input/...
  • New TestRetryListNotifyAndTryShift in internal/autoretry/auto_retry_list_test.go pins the autoretry-level contract: retryNotify fires on a nack, doesn't fire on a successful ack, coalesces; TryShiftRetry returns false without dispatching a fresh read when the queue is empty.
  • New TestBatchAutoRetryReadAheadOrdering in public/service/input_auto_retry_batched_readahead_test.go is the integration regression: on the unfixed code it observes [A, B, A_retry] (read-ahead bug); with the fix it observes [A, A_retry, B].

Backward compatibility

  • BatchInput implementations that don't implement retryAwareBatchInput (i.e. anything not wrapped by AutoRetryNacksBatched) are routed through the original code paths verbatim — retryNotify is nil so that select case never fires, deferred stays empty, and the loop reduces to its previous shape.
  • Existing tests under ./public/service/, ./internal/autoretry/, and ./internal/component/input/... all still pass under -race.
  • One small latent issue tidied up while in the area: pendingAcks.Add(1) is now incremented after the push succeeds rather than before, fixing a miscount that could have held up shutdown if the push raced shutSig. The shadowed-err capture in the ack-handler goroutine is changed to a local := to avoid a pre-existing data race across iterations.

Notes for reviewers

  • The internal/component/input/interface.go change I'd ordinarily prefer (a public RetryAware companion to Async) was scoped down to an unexported interface in async_reader.go to keep the change isolated; happy to promote it to a public interface in a follow-up if you'd rather have it documented as part of the Async surface.
  • The retryNotifyChan / tryShiftRetry methods on autoRetryInputBatched are intentionally unexported and matched structurally — third-party BatchInput implementations cannot accidentally opt in or break the assertion.

When auto_replay_nacks is enabled, a nacked batch is supposed to be
re-delivered to the pipeline before any subsequent fresh batch. The
autoretry list's Shift already prioritises retries over fresh reads --
but AsyncReader's loop reads ahead: after pushing batch N to its
unbuffered transactions channel, it spawns the ack-handler goroutine
and immediately calls ReadBatch for batch N+1. By the time batch N
errors and the ack-handler queues the retry, ReadBatch has already
returned a fresh batch N+1 and pushed it onto the channel. The retry
ends up scheduled behind the read-ahead.

This breaks ordering for downstream consumers that rely on
auto_replay_nacks for at-least-once delivery -- a partial output
failure on a same-key partition surfaces as same-key reordering at the
destination, even though every layer "preserves order" in isolation.
Reproduced from connect#4387.

Fix: race the channel send against a retry-available signal. When the
signal wins, defer the in-flight fresh batch into a local queue and
loop. The next iteration prefers (in order):

  1. A queued retry (via TryShiftRetry, non-blocking).
  2. A previously deferred fresh batch.
  3. A new fresh read.

Plumbed through:

  - autoretry.List grows a buffered (size 1, coalescing) retryNotify
    channel and a TryShiftRetry method. wrapPendingAck non-blocking
    sends on retryNotify whenever a retry is queued.
  - autoRetryInputBatched exposes both via unexported methods.
  - airGapBatchReader forwards them, gated on the underlying
    BatchInput satisfying the unexported retryAwareBatchInput
    interface -- so third-party BatchInput implementations are
    unaffected.
  - AsyncReader.loop type-asserts r.reader to an unexported retryAware
    interface (Go structural typing means no internal/public package
    coupling). Readers that don't implement it get the previous
    behaviour exactly.

Tests:

  - TestRetryListNotifyAndTryShift pins the autoretry-level contract:
    notify fires on nack, doesn't fire on success, coalesces;
    TryShiftRetry returns false without dispatching a fresh read when
    the queue is empty.
  - TestBatchAutoRetryReadAheadOrdering is the integration regression:
    on the unfixed code it observes [A, B, A_retry] (read-ahead bug);
    with the fix it observes [A, A_retry, B].

Refs redpanda-data/connect#4387.
@Jeffail

Jeffail commented Jun 10, 2026

Copy link
Copy Markdown
Collaborator

Thank you for the thorough write-up, Travis — the read-ahead diagnosis is precise and the mechanism is well documented. I'd like to push back on the framing before we go further, as I think the underlying ordering problem survives this change.

The fix only closes a window that is rarely open when a nack arrives. The splice operates at the input → consumer boundary, preempting a fresh batch that has been read but not yet handed across the (unbuffered) transactions channel. But the standard pipeline processor pulls the next input transaction the instant it dispatches the current one downstream — it does not await the ack (internal/pipeline/processor.go, the len(resultBatches) == 1 path continues immediately after the send). A nack originates further downstream, at the output, strictly later. So in a normal input → pipeline → output stream, batch N+1 has already crossed the input boundary by the time N is nacked, and there is nothing left to defer. The splice meaningfully fires only when the input's immediate consumer is itself sequential (e.g. an output read directly with max_in_flight: 1).

That makes the change dominated in every topology. Where pipelining is present, it does not fire. Where the consumer is sequential enough for it to fire, max_in_flight: 1 plus output-side retry already provides a genuine ordering guarantee rather than a probabilistic improvement.

More fundamentally, auto_replay_nacks is an at-least-once shim, not an ordering mechanism. It adopts the batch and allows the source to advance — which is the whole point — so once two batches are concurrently in flight downstream, a nack-then-retry of the first necessarily lands after the second. That is unavoidable without serialising to a single in-flight batch, which would defeat the component. Adding an ordering property here risks signalling that auto_replay_nacks is order-safe when it structurally is not.

For the scenario behind redpanda-data/connect#4387, I'd suggest the correct layer is the output: max_in_flight: 1 with retry-in-place / backpressure preserves order with a real guarantee (a DLQ is the alternative when one is willing to trade ordering of the failed item for throughput). For a Kafka source specifically, native uncommitted-offset redelivery re-delivers in order without this shim at all.

I'd be glad to be shown a concrete topology where the second batch is still parked at the input boundary at nack time — that would change my reading. Absent that, my inclination is to steer the original issue toward an output-level solution rather than merge this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants