Bulker: support multi-partition retry topics#1348
Conversation
- topic_manager: for retry topics start a consumer on a separate shard for each partition: shards (hash+i) % shardsCount, i in 0..partitions-1. Single-partition topics keep the same shard as before. Remove the single-partition restriction for retry consumers. - abstract_batch_consumer: all retry consumers discover their partition from the consumer group assignment instead of the manual assignment by instance index that was used for the shared retry topic. - retry_consumer: handle rebalance in the middle of a batch: a batch must contain messages of a single partition to commit their offsets atomically, so when a message from another partition arrives, seek it back, commit what was consumed and end the run - the next run will query offsets for the right partition. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Reviewed the retry-topic sharding/assignment changes in topic_manager, abstract_batch_consumer, and retry_consumer (including rebalance handling). I found one correctness gap around partition fan-out limits that can leave retry topics unconsumed in some layouts; details are in the inline comment.
…delay - when consumer assignment has more than 1 partition (topic has more partitions than shards or rebalancing in progress) report SystemError and continue with the first assigned partition instead of failing the run: safe because batch commits offsets of a single partition only - wait for assignment only while it is empty - spread retry consumers' start time randomly across the batch period Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…s than shards Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
… partitions instead Consumer requires exactly 1 assigned partition again. Topic manager reports SystemError and skips the retry topic entirely when it has more partitions than shards count. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Reviewed the retry-consumer and scheduler changes in:\n- bulker/bulkerapp/app/abstract_batch_consumer.go\n- bulker/bulkerapp/app/retry_consumer.go\n- bulker/bulkerapp/app/topic_manager.go\n- bulker/bulkerapp/app/cron.go\n\nI focused on retry partition assignment, rebalance behavior, and scheduling/startup offsets. I did not find actionable correctness, security, or user-visible regression issues in this patch.
Group join and rebalance callbacks are served only during poll calls. The assignment wait loop used plain sleeps, so a freshly created consumer never obtained its assignment and every run failed with assignment_error. Poll the consumer instead of sleeping and seek back any message that slips through before batch processing starts. Verified with TestEventsRetry integration test. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Reviewed the retry-consumer partitioning/scheduling changes across topic_manager, abstract_batch_consumer, retry_consumer, and cron. I found one potential correctness regression around retry assignment handling for consumers that legitimately receive no partitions.
Extract startConsumerOnThisShard helper and use it for both per-destination topics and the shared destination-messages-retry topic: a retry consumer is started on a separate shard for each partition instead of on every instance. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Reviewed the changes in abstract_batch_consumer.go, retry_consumer.go, topic_manager.go, and cron.go, with focus on retry partition assignment, shard/topic consumer placement, and scheduling behavior.
No additional actionable bugs/regressions found beyond the existing open review threads, so I am not duplicating those comments here.
Summary
Allows retry topics (both per-destination and the shared
destination-messages-retry) to have multiple partitions, consumed in parallel by multiple bulker instances.(hash+i) % shardsCountfori in 0..N-1(capped atshardsCount). Single-partition topics map to exactly the same shard as before, so existing topic→shard placement is unchanged. The arithmetic is done in uint64 to avoid a uint32 overflow corner case that could map two partitions to the same shard for non-power-of-2 shard counts. The "retry topic must have a single partition" restriction is removed (batch topics keep it).Assign(partition = INSTANCE_INDEX)previously used for the shared retry topic is removed — partition distribution is handled by group rebalancing for all retry topics uniformly.Notes / known degraded modes
len(assignment) != 1→assignment_error). Transient multi-assignment (peer instance down/suspended past session timeout) or zero-assignment (more group members than partitions) skips the run with an error until the group stabilizes; offsets are committed transactionally, so no loss — retries are just delayed.SHARDS_COUNT.Test plan
go build ./app/passes🤖 Generated with Claude Code