fix(pooling): fix race conditions, deadlock, and connection leak in PoolEntry#100
fix(pooling): fix race conditions, deadlock, and connection leak in PoolEntry#100dkasimovskiy wants to merge 6 commits into
Conversation
| * <p>Also decrements count of unavailable clients and cancels reconnect task. | ||
| */ | ||
| public void unlock() { | ||
| public synchronized void unlock() { |
There was a problem hiding this comment.
Хорошо бы отразить здесь в комментарии, из каких потоков может быть вызован этот метод.
- Из потока пользовательского кода: при вызове функции изменения настроек обоймы подключений может вызываться unlock(), когда закрываются лишние). Не вызывается практически никогда. (Событие А).
- Из потока ввода/вывода Netty при обработке ответов на запросы IPROTO_PING (или другого метода для проверки): при анализе ответа может вызываться как lock, так и unlock. (Событие Б).
- Из потока ввода/вывода Netty при обработке события успешного подключения: вызывается unlock. (Событие В).
There was a problem hiding this comment.
Здесь указанные события одновременно произойти не могут:
- Событием А можно пренебречь.
- Событие Б не может произойти одновременно с событием В, так как событие Б случается только после того, как успешно подключились и запустили задачу проверки соединения.
| * <p>Also increments count of unavailable clients. | ||
| */ | ||
| public void lock() { | ||
| public synchronized void lock() { |
There was a problem hiding this comment.
Тоже давай отразим в комментарии, из каких потоков может быть вызван этот метод, и что может случиться:
- Из потока ввода/вывода Netty в случае, когда подключение не удаётся: вызывается handleConnectError. (обозначим как событие A).
- Из потока ввода/вывода Netty в случае, когда происходит обрыв подключения: также вызывается handleConnectError. (обозначим как событие Б).
- Из потока ввода/вывода Netty в случае, когда происходит анализ ответа на запрос PING и принимается решение заблокировать соединение, чтобы вывести его из распределения нагрузки. (обозначим как событие В).
Есть вероятность, что могут случиться одновременно события Б и В (решили вывести соединение из под нагрузки и тут же прилетает его завершение).2
В этом случае может увеличиться дважды счётчик unavailable.
Какие еще события могут произойти?
| * with the Netty close-callback path, which takes the {@code ConnectionImpl} monitor first and | ||
| * then re-enters {@link #handleConnectError(Object, Throwable)} on the entry. | ||
| */ | ||
| public void shutdown() { |
There was a problem hiding this comment.
shutdown() также надо проанализировать, из каких потоков он может быть вызван.
Как минимум, из потока пользовательского кода (закрытие обоймы соединений) ии... каких еще?
| return (Integer) result.get(0) - 1; | ||
| // box.stat.net().CONNECTIONS.current is updated asynchronously by the IProto worker; | ||
| // the loop's fiber.sleep lets it drain pending connections before we read. | ||
| String lua = |
There was a problem hiding this comment.
Мне кажется, что именно эти изменения, вкупе с изменениями ниже (waitForActiveConnections) и дают наибольшй эффект, нежели попытки обмазать всё блокировками в PoolEntry
There was a problem hiding this comment.
Как минимум для Tarantool 3.5.0 это помогает.
| * will not be returned to outer client. | ||
| */ | ||
| private boolean isLocked; | ||
| private volatile boolean isLocked; |
There was a problem hiding this comment.
Т.к. подключение в обойме может быть затронуто из нескольких потоков, для его блокировки и разблокировки, то вероятно, полезнее будет сделать isLocked как AtomicBoolean и переделать методы блокировки через него. Возможно, этого изменения будет достаточно
…oolEntry PoolEntry state transitions had several races between netty IO threads, the HashedWheelTimer heartbeat worker, and user-facing reconnect calls that surfaced as ABBA deadlocks, NPEs on inline connect failures, and leaked connections after a KILL/reconnect cycle in CI. PoolEntry: - Synchronize state mutations and mark connectFuture, heartbeatTask, reconnectTask, lastHeartbeatEvent, isLocked, and isShutdown volatile/AtomicBoolean for cross-thread visibility. - Narrow the entry-monitor critical sections to field mutations; call client.close() and emit() outside the monitor to break the ABBA deadlock between ConnectionImpl and PoolEntry monitors that hung DistributingRoundRobinBalancerTest. - Return the local connect future from internalConnect() so an inline connect failure cannot leave the caller observing a null connectFuture after handleConnectError() nulls it for reconnect. - Keep client.close() in shutdown() on every invocation (closeChannel is idempotent if already closed) but guard only the onConnectionClosed emit, so a KILL-then-reconnect cycle cannot leak the new connection when its auth/ping subsequently fails. - Serialize connectAfter() reconnect-task scheduling to avoid double scheduling; add a double-check of connectFuture in internalConnect() to return the in-flight future instead of starting a new connect. IProtoClientPoolImpl: - Synchronize forEach() on connectionPoolLock to avoid CME under concurrent setGroups(). Tests (BasePoolTest / ConnectionPoolReconnectsTest): - Wait for box.stat.net().CONNECTIONS.current to stabilise in getActiveConnectionsCount: the IProto worker updates it asynchronously, so a single read often lags by 5-15 connections when 20+ are opened in a burst. - Collapse the wait-for-stable Lua script to a single line so tarantool emits one YAML document (SnakeYAML rejects multi-document streams). - Wait for the active connection count to reach the expected value in ConnectionPoolReconnectsTest post-reconnect assertions via a new waitForActiveConnections() helper. Verified locally on 3.5.0 and 2.11.8: ConnectionPoolReconnectsTest, ConnectionPoolTest, ConnectionPoolHeartbeatTest, DistributingRoundRobinBalancerTest, and unit tests all pass consistently where they were previously flaky.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
…oolEntry Fix a relocated ABBA deadlock: `PoolEntry.connect()`/`internalConnect()` no longer hold the entry monitor across `client.connect()`, which would deadlock with the Netty close-callback path on the shared `ConnectionImpl` monitor during a close/reconnect overlap. Fix `onConnectionClosed` accounting: the `PoolEntry` shutdown idempotency flag is now reset per connection generation, so a KILL/reconnect cycle emits one close event per generation instead of suppressing all closes after the first. `IProtoClientPoolImpl.forEach()` snapshots clients under the pool lock and invokes the action outside it, avoiding `ConcurrentModificationException` under concurrent `setGroups()` without holding the lock across a user callback.
…oolEntry Fix a relocated ABBA deadlock: `PoolEntry.connect()`/`internalConnect()` no longer hold the entry monitor across `client.connect()`, which would deadlock with the Netty close-callback path on the shared `ConnectionImpl` monitor during a close/reconnect overlap. Fix `onConnectionClosed` accounting: the `PoolEntry` shutdown idempotency flag is now reset per connection generation, so a KILL/reconnect cycle emits one close event per generation instead of suppressing all closes after the first. `IProtoClientPoolImpl.forEach()` snapshots clients under the pool lock and invokes the action outside it, avoiding `ConcurrentModificationException` under concurrent `setGroups()` without holding the lock across a user callback.
…threading Make `isLocked` an `AtomicBoolean` and switch `lock()`/`unlock()` to `compareAndSet` so the `unavailable` counter increments/decrements exactly once per transition even when `ConnectionBreak` and `HeartbeatInvalidate` race on the same entry. Document every call site of `lock()`, `unlock()`, and `shutdown()` in JavaDoc: the full event name, the thread it runs on, and (for the `UserConfigChange` caller of `unlock()`) the lock it must hold to be safe. Addresses the remaining review threads on PR #100 (PRRT_kwDOQchuhc6LMf1w, PRRT_kwDOQchuhc6LMXld, PRRT_kwDOQchuhc6LMnIM). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1347025 to
3e37768
Compare
Move log.info outside synchronized block in connectAfter method
ab54164 to
7216e4c
Compare
Анализ многопоточных вызовов методов
|
| Метод | Изменение | Что даёт |
|---|---|---|
lock() (L332) |
synchronized + volatile boolean isLocked → AtomicBoolean isLocked + compareAndSet(false, true) |
lock-free; ровно один unavailable.increment за false → true |
unlock() (L363) |
synchronized + volatile boolean isLocked → AtomicBoolean isLocked + compareAndSet(true, false) |
lock-free; ровно один unavailable.decrement + stopReconnectTask за true → false |
isLocked() (L375) |
return isLocked; → return isLocked.get(); |
идемпотентный геттер |
connect() (L432) |
synchronized → обычный |
развязка с монитором internalConnect (тот использует короткие synchronized блоки вокруг connectFuture) |
internalConnect() (L487) |
один synchronized метод → два коротких synchronized блока вокруг connectFuture |
client.connect() крутится без удержания entry-монитора → развязка с client.close() в shutdown (ABBA) |
connectAfter() (L568) |
log.info вынесен из-под synchronized; synchronized блок у́же |
лог больше не под локом; counter-логика та же |
stopReconnectTask() (L749) |
без изменений тела, остался synchronized |
— |
shutdown() (L412) |
synchronized { connectFuture = null; stopHeartbeat(); } (без изменений) + isShutdown.compareAndSet на emit |
монитор по-прежнему отпускается до client.close() (ABBA); emit — lock-free one-shot |
handleConnectError() (L552) |
synchronized { connectFuture = null; } (без изменений) |
— |
close() (L380) |
без изменений | — |
fire() (L664) |
без изменений, но lock/unlock/shutdown стали CAS/lock-free |
— |
Поля: isLocked, isShutdown — AtomicBoolean; connectFuture, reconnectTask —
обычные ссылки под synchronized (this). JavaDoc на lock/unlock/shutdown расширен
полными именами событий и пометками о потоках — это уже сделано, ниже только анализ.
Участники многопоточной системы
| Поток | Обозначение | Чем занят | Как попадает в PoolEntry |
|---|---|---|---|
| Пользовательский поток | T_user |
Код приложения: pool.acquire(), pool.close(), pool.setGroups(...), pool.forEach(...) |
Через публичный API IProtoClientPoolImpl, который сам сериализует мутации на connectionPoolLock |
| Netty IO поток | T_io |
Обработка ответов IProto, доставка close-колбэков | Через client.onClose(CLOSE_BY_REMOTE/SHUTDOWN, this::handleConnectError) и через whenComplete колбэки на CompletableFuture от IProto-запросов (connect, authorize, ping) |
| HashedWheelTimer поток | T_timer |
Срабатывание Timeout-ов: heartbeat-ping и reconnect-задачи |
Через timerService.newTimeout(this::ping, ...) и timerService.newTimeout(timeout -> internalConnect(), ...) |
Каждый PoolEntry делит между этими потоками одно и то же состояние (счётчики,
флаги, future-ы, таймеры). Координация — три механизма, у каждого своя роль:
AtomicReference-подобные CAS /AtomicBoolean(isLocked,isShutdown) —
одно-полевые lock-free переходы состояния, выдерживают гонки между
T_ioиT_timerбез блокировок.synchronized (this)блоки вокругconnectFutureиreconnectTask— это
много-полевые согласованные мутации (прочитать-проверить-установить или
set(null)вместе сstopHeartbeat()). Удерживаются коротко и нигде не
перекрывают I/O-операции (client.connect,client.close).synchronizedвIProtoClientPoolImpl(connectionPoolLock) — сериализует
мутации пула со стороны пользовательского кода.
Покомандная карта вызовов
lock()
| Источник | Поток | Событие |
|---|---|---|
handleConnectError(r, exc) L562 |
T_io (Netty close listener) |
ConnectFailure или ConnectionBreak |
fire(HeartbeatEvent.INVALIDATE) L673 |
T_io (heartbeat pong whenComplete) |
HeartbeatInvalidate |
CAS на isLocked гарантирует: ровно один unavailable.incrementAndGet,
проигравший — no-op.
sequenceDiagram
autonumber
participant N1 as T_io #1<br/>(close callback)
participant N2 as T_io #2<br/>(heartbeat pong)
participant E as PoolEntry
participant C as unavailable
par Close path
N1->>E: handleConnectError → lock()
and Heartbeat path
N2->>E: pong → fire(INVALIDATE) → lock()
end
Note over E: isLocked.compareAndSet(false, true)
E-->>N1: true (winner) | false (loser)
E-->>N2: true (winner) | false (loser)
alt winner
E->>C: incrementAndGet()
else loser
E--xC: no-op
end
unlock()
| Источник | Поток | Событие |
|---|---|---|
fire(HeartbeatEvent.ACTIVATE) L677 |
T_io (heartbeat pong) |
HeartbeatResponse |
onConnectComplete(r, exc) L541 |
T_io (connect whenComplete) |
ConnectSuccess |
IProtoClientPoolImpl.shrinkGroup (внешний) |
T_user, удерживает connectionPoolLock |
UserConfigChange |
HeartbeatResponse/ConnectSuccess идут на запертую запись (heartbeat стартует
после коннекта, HeartbeatResponse — только после HeartbeatInvalidate); гонки
T_io ↔ T_io решает CAS. UserConfigChange сериализован с T_user через
connectionPoolLock, с T_io может пересечься только в окне
unlock → close.
sequenceDiagram
autonumber
participant U as T_user<br/>(setGroups)
participant N1 as T_io #1<br/>(pong)
participant E as PoolEntry
participant P as IProtoClientPoolImpl
Note over U,P: U holds connectionPoolLock
U->>P: shrinkGroup() walks entries
loop each dropped entry
U->>E: entry.unlock() — UserConfigChange
end
par concurrent
N1->>E: pong → fire(ACTIVATE) → unlock() — HeartbeatResponse
end
Note over E: isLocked.compareAndSet(true, false)
E-->>U: true | false
E-->>N1: true | false
Note over E: only one wins,<br/>loser no-ops
shutdown()
| Источник | Поток | Событие |
|---|---|---|
close() L382 |
T_user (pool.close()) |
PoolClose |
handleConnectError(r, exc) L563 |
T_io (Netty close listener) |
ConnectError (включает ConnectFailure + ConnectionBreak) |
fire(HeartbeatEvent.KILL) L681 |
T_io (heartbeat pong) |
HeartbeatKill |
Три фазы с разными гарантиями — synchronize полей, I/O под ConnectionImpl-монитором, lock-free emit:
flowchart TD
A[shutdown entry] --> B["synchronized (this) { connectFuture = null; stopHeartbeat(); }"]
B --> C["client.close() —<br/>acquires ConnectionImpl monitor"]
C --> D["isShutdown.compareAndSet(false, true)<br/>— lock-free one-shot"]
D -->|won| E[emit onConnectionClosed]
D -->|lost| F[no-op]
internalConnect()
| Источник | Поток | Событие |
|---|---|---|
connect() L433 |
T_user (публичный API) |
Acquire |
Reconnect-таска из connectAfter |
T_timer |
ReconnectFired |
sequenceDiagram
autonumber
participant U as T_user
participant T as T_timer
participant E as PoolEntry
participant C as IProtoClient
rect rgb(245, 245, 245)
Note over E: 1st sync block — fast path
U->>E: internalConnect()
T->>E: internalConnect() (reconnect task)
E->>E: synchronized: if (connectFuture != null) return it
end
Note over U,T: Both pass: connectFuture is null
U->>C: client.connect() — outside lock
T->>C: client.connect() — outside lock
rect rgb(245, 245, 245)
Note over E: 2nd sync block — install winner
E->>E: synchronized: if (cf already installed) return winner's,<br/> else install mine
end
alt winner
E->>E: isShutdown.set(false)
E->>E: cf.whenComplete(onConnectComplete)
E-->>winner: return cf
else loser
E-->>loser: return winner's cf
Note over E: loser's cf silently completes,<br/> onConnectComplete fires once on winner's path
end
Двойной synchronized даёт race-free установку winner-а без удержания entry-монитора
через сетевой client.connect() — иначе T_user/T_timer выстраивались бы в
очередь на время коннекта.
connectAfter()
| Источник | Поток | Событие |
|---|---|---|
handleConnectError(r, exc) L564 |
T_io (close listener) |
ConnectError |
fire(HeartbeatEvent.KILL) L683 |
T_io (heartbeat pong) |
HeartbeatKill |
Тело:
flowchart TD
A[connectAfter entry] --> B[log.info — outside lock]
B --> C["newTask = timerService.newTimeout(...) — outside lock"]
C --> D["synchronized (this) { old = reconnectTask.getAndSet(newTask) ... }"]
D -->|old == null| E["reconnecting.incrementAndGet()"]
D -->|old != null| F["old.cancel() —<br/>counter stays (the increment carries over)"]
E --> G[emit onReconnectScheduled]
F --> G
log.info и newTimeout — вне лока. Counter-семантика: первый шедул +1,
замена переносит старый инкремент, stopReconnectTask — -1.
stopReconnectTask()
| Источник | Поток | Событие |
|---|---|---|
close() L381 |
T_user |
PoolClose |
unlock() L365 (когда CAS выигран) |
T_user (UserConfigChange) или T_io (HeartbeatResponse/ConnectSuccess) |
зависит от пути |
Тело — атомарный test-and-clear под synchronized (this):
reconnectTask != null:reconnecting.decrementAndGet(); reconnectTask.cancel(); reconnectTask = null;reconnectTask == null: no-op.
Сериализует гонку T_user.close ↔ T_io.unlock-CAS-winner ↔
T_timer-вызванный connectAfter (см. ниже раздел про переплетение путей).
handleConnectError()
| Источник | Поток | Событие |
|---|---|---|
Netty close listener CLOSE_BY_REMOTE |
T_io |
ConnectionBreak |
Netty close listener CLOSE_BY_SHUTDOWN |
T_io |
ConnectFailure или ConnectionBreak |
Только T_io, но T_io у разных коннектов — разные потоки, поэтому
конкуренция в synchronized блоке за connectFuture реальна. После
зануления connectFuture идёт не-защищённая последовательность:
emit → lock (CAS) → shutdown (synchronized блок) → connectAfter
(synchronized блок). Каждый из этих шагов самодостаточен по синхронизации,
но они не атомарны как единая последовательность — между ними может
вклиниться другой T_io или T_user.
close()
| Источник | Поток | Событие |
|---|---|---|
IProtoClientPoolImpl.close() |
T_user |
PoolClose |
stopReconnectTask() + shutdown(). Оба под локом / с защитой; T_io
может параллельно прислать close-callback в handleConnectError, что
приведёт ко второму shutdown() (см. ABBA-развязку выше).
Зависимости между потоками
Граф «кто кого вызывает» (стрелка = «вызывает метод, который исполняется
на»):
flowchart LR
user["T_user<br/>(pool operations)"]
io1["T_io #1<br/>(close listener)"]
io2["T_io #2<br/>(connect/ping whenComplete)"]
timer["T_timer<br/>(HashedWheelTimer)"]
user -->|connect| internalConnect
user -->|"unlock(shrinkGroup)"| unlock
user -->|close| shutdown
io1 -->|handleConnectError| lock
io1 -->|handleConnectError| shutdown
io1 -->|handleConnectError| connectAfter
io2 -->|pong → fire| lock
io2 -->|pong → fire| unlock
io2 -->|pong → fire| shutdown
io2 -->|onConnectComplete| unlock
timer -->|ping fires| io2
timer -->|reconnect task| internalConnect
internalConnect -.allocates.-> io2
connectAfter -.schedules.-> timer
Кто кого ждёт (прямые блокировки):
T_user → T_io:T_userне блокируется наT_ioвconnect(). Возвращённый
CompletableFuture— асинхронный;T_userсам решает, делать ли
.get()или.whenComplete. Это обратное давление на пользователя,
а не блокировка внутриPoolEntry.T_io → T_timer: не блокируется.connectAfterшедулит задачу и сразу
возвращается.pongтоже не ждёт — он толькоnextPing()шедулит.T_timer → T_io: не блокируется.ping()отправляет запрос, регистрирует
whenCompleteи возвращается. Ответ обработается наT_ioасинхронно.T_user → T_user: пользовательские мутации (setGroups,close)
сериализуются наconnectionPoolLockвнутриIProtoClientPoolImpl.
Acquire/release этого лока не берут.
Сетевая развязка (нет блокировок через сеть):
client.connect(...)(вinternalConnect) исполняется внеsynchronized.
Это ключевое отличие от старой монолитнойsynchronizedобёртки: параллельные
T_user/T_timerне выстраиваются в очередь на время коннекта.client.close()(вshutdown) исполняется внеsynchronized. Entry-монитор
нужен только чтобы сериализовать занулениеconnectFutureиstopHeartbeat;
дальшеclient.close()берётConnectionImpl-монитор сам, и мы обязаны
к этому моменту отпустить entry-монитор, иначе ABBA с Netty close-callback.
Lock-ordering: Netty close-callback берёт ConnectionImpl → entry; shutdown
берёт entry → отпускает → ConnectionImpl. Один глобальный порядок, без циклов.
Summary
PoolEntry had several races between netty IO threads, the HashedWheelTimer heartbeat worker, and reconnect calls, manifesting as ABBA deadlocks, NPEs on inline connect failures, and leaked connections after a KILL/reconnect cycle in CI.
Production fixes
PoolEntry
volatile/AtomicBooleanfor cross-thread visibility.client.close()andemit()outside the monitor to break the ABBA deadlock betweenConnectionImplandPoolEntrymonitors that hungDistributingRoundRobinBalancerTest.internalConnect()so an inline connect failure cannot leave the caller observing a nullconnectFutureafterhandleConnectError()nulls it for reconnect.client.close()inshutdown()on every invocation (idempotent viacloseChannelno-op) but guard only theonConnectionClosedemit, so a KILL-then-reconnect cycle cannot leak the new connection when its auth/ping subsequently fails.connectAfter()reconnect-task scheduling to avoid double scheduling; double-checkconnectFutureininternalConnect()to return the in-flight future instead of starting a new connect.IProtoClientPoolImpl
forEach()onconnectionPoolLockto avoid CME under concurrentsetGroups().Test fixes
box.stat.net().CONNECTIONS.currentto stabilise inBasePoolTest.getActiveConnectionsCount: the IProto worker updates it asynchronously, so a single read often lags by 5-15 connections when 20+ are opened in a burst.ConnectionPoolReconnectsTestpost-reconnect assertions via a newwaitForActiveConnections()helper.I haven't forgotten about:
Related issues: