Skip to content

Bulker: support multi-partition retry topics#1348

Merged
absorbb merged 8 commits into
newjitsufrom
retry-consumer-multi-partition
Jun 5, 2026
Merged

Bulker: support multi-partition retry topics#1348
absorbb merged 8 commits into
newjitsufrom
retry-consumer-multi-partition

Conversation

@absorbb
Copy link
Copy Markdown
Contributor

@absorbb absorbb commented Jun 5, 2026

Summary

Allows retry topics (both per-destination and the shared destination-messages-retry) to have multiple partitions, consumed in parallel by multiple bulker instances.

  • topic_manager: for a retry topic with N partitions, a retry consumer is started on N consecutive shards: (hash+i) % shardsCount for i in 0..N-1 (capped at shardsCount). Single-partition topics map to exactly the same shard as before, so existing topic→shard placement is unchanged. The arithmetic is done in uint64 to avoid a uint32 overflow corner case that could map two partitions to the same shard for non-power-of-2 shard counts. The "retry topic must have a single partition" restriction is removed (batch topics keep it).
  • abstract_batch_consumer: every retry consumer now discovers its partition from the consumer group assignment. The manual Assign(partition = INSTANCE_INDEX) previously used for the shared retry topic is removed — partition distribution is handled by group rebalancing for all retry topics uniformly.
  • retry_consumer: handle a rebalance happening mid-batch. A batch must contain messages of a single partition (defined by the batch's first message) for its offsets to be committed atomically, so when a message from a different partition arrives, it is seeked back for later re-reading, the already-consumed messages are committed, and the run ends — the next run queries watermark/committed offsets for whatever partition the consumer owns by then.

Notes / known degraded modes

  • A consumer must own exactly 1 partition to consume (len(assignment) != 1assignment_error). Transient multi-assignment (peer instance down/suspended past session timeout) or zero-assignment (more group members than partitions) skips the run with an error until the group stabilizes; offsets are committed transactionally, so no loss — retries are just delayed.
  • Partition count of a retry topic should not exceed SHARDS_COUNT.

Test plan

  • go build ./app/ passes
  • Shard fan-out simulated for shards∈{1,2,3,12,16} × partitions∈{0,1,3,5,16,20} incl. wraparound and uint32-overflow hashes: every partition gets exactly one shard, single-partition placement identical to current behavior
  • Deploy to staging: create a 3-partition per-destination retry topic, verify 3 instances each consume one partition and messages drain after rebalances

🤖 Generated with Claude Code

- topic_manager: for retry topics start a consumer on a separate shard
  for each partition: shards (hash+i) % shardsCount, i in 0..partitions-1.
  Single-partition topics keep the same shard as before. Remove the
  single-partition restriction for retry consumers.
- abstract_batch_consumer: all retry consumers discover their partition
  from the consumer group assignment instead of the manual assignment by
  instance index that was used for the shared retry topic.
- retry_consumer: handle rebalance in the middle of a batch: a batch
  must contain messages of a single partition to commit their offsets
  atomically, so when a message from another partition arrives, seek it
  back, commit what was consumed and end the run - the next run will
  query offsets for the right partition.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
jitsu-code-review[bot]
jitsu-code-review Bot previously approved these changes Jun 5, 2026
Copy link
Copy Markdown

@jitsu-code-review jitsu-code-review Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed the retry-topic sharding/assignment changes in topic_manager, abstract_batch_consumer, and retry_consumer (including rebalance handling). I found one correctness gap around partition fan-out limits that can leave retry topics unconsumed in some layouts; details are in the inline comment.

Comment thread bulker/bulkerapp/app/topic_manager.go Outdated
…delay

- when consumer assignment has more than 1 partition (topic has more
  partitions than shards or rebalancing in progress) report SystemError
  and continue with the first assigned partition instead of failing the
  run: safe because batch commits offsets of a single partition only
- wait for assignment only while it is empty
- spread retry consumers' start time randomly across the batch period

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
absorbb and others added 3 commits June 5, 2026 14:49
…s than shards

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
… partitions instead

Consumer requires exactly 1 assigned partition again. Topic manager
reports SystemError and skips the retry topic entirely when it has
more partitions than shards count.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
jitsu-code-review[bot]
jitsu-code-review Bot previously approved these changes Jun 5, 2026
Copy link
Copy Markdown

@jitsu-code-review jitsu-code-review Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed the retry consumer changes in topic_manager, abstract_batch_consumer, retry_consumer, and cron with focus on partition assignment and run scheduling. I found one correctness risk in retry assignment handling (inline).

Comment thread bulker/bulkerapp/app/abstract_batch_consumer.go
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
jitsu-code-review[bot]
jitsu-code-review Bot previously approved these changes Jun 5, 2026
Copy link
Copy Markdown

@jitsu-code-review jitsu-code-review Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed the retry-consumer and scheduler changes in:\n- bulker/bulkerapp/app/abstract_batch_consumer.go\n- bulker/bulkerapp/app/retry_consumer.go\n- bulker/bulkerapp/app/topic_manager.go\n- bulker/bulkerapp/app/cron.go\n\nI focused on retry partition assignment, rebalance behavior, and scheduling/startup offsets. I did not find actionable correctness, security, or user-visible regression issues in this patch.

Group join and rebalance callbacks are served only during poll calls.
The assignment wait loop used plain sleeps, so a freshly created
consumer never obtained its assignment and every run failed with
assignment_error. Poll the consumer instead of sleeping and seek back
any message that slips through before batch processing starts.

Verified with TestEventsRetry integration test.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
jitsu-code-review[bot]
jitsu-code-review Bot previously approved these changes Jun 5, 2026
Copy link
Copy Markdown

@jitsu-code-review jitsu-code-review Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed the retry-consumer partitioning/scheduling changes across topic_manager, abstract_batch_consumer, retry_consumer, and cron. I found one potential correctness regression around retry assignment handling for consumers that legitimately receive no partitions.

Comment thread bulker/bulkerapp/app/abstract_batch_consumer.go
Extract startConsumerOnThisShard helper and use it for both per-destination
topics and the shared destination-messages-retry topic: a retry consumer is
started on a separate shard for each partition instead of on every instance.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown

@jitsu-code-review jitsu-code-review Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed the changes in abstract_batch_consumer.go, retry_consumer.go, topic_manager.go, and cron.go, with focus on retry partition assignment, shard/topic consumer placement, and scheduling behavior.

No additional actionable bugs/regressions found beyond the existing open review threads, so I am not duplicating those comments here.

@absorbb absorbb merged commit e9c8796 into newjitsu Jun 5, 2026
5 checks passed
@absorbb absorbb deleted the retry-consumer-multi-partition branch June 5, 2026 12:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant