Skip to content

fix(pooling): fix race conditions, deadlock, and connection leak in PoolEntry#100

Draft
dkasimovskiy wants to merge 6 commits into
masterfrom
fix/connection-pool-flaky-tests
Draft

fix(pooling): fix race conditions, deadlock, and connection leak in PoolEntry#100
dkasimovskiy wants to merge 6 commits into
masterfrom
fix/connection-pool-flaky-tests

Conversation

@dkasimovskiy

@dkasimovskiy dkasimovskiy commented Jun 19, 2026

Copy link
Copy Markdown
Contributor

Summary

PoolEntry had several races between netty IO threads, the HashedWheelTimer heartbeat worker, and reconnect calls, manifesting as ABBA deadlocks, NPEs on inline connect failures, and leaked connections after a KILL/reconnect cycle in CI.

Production fixes

PoolEntry

  • Synchronize state mutations; mark fields volatile/AtomicBoolean for cross-thread visibility.
  • Narrow monitor critical sections to field mutations; call client.close() and emit() outside the monitor to break the ABBA deadlock between ConnectionImpl and PoolEntry monitors that hung DistributingRoundRobinBalancerTest.
  • Return the local chain future from internalConnect() so an inline connect failure cannot leave the caller observing a null connectFuture after handleConnectError() nulls it for reconnect.
  • Keep client.close() in shutdown() on every invocation (idempotent via closeChannel no-op) but guard only the onConnectionClosed emit, so a KILL-then-reconnect cycle cannot leak the new connection when its auth/ping subsequently fails.
  • Serialize connectAfter() reconnect-task scheduling to avoid double scheduling; double-check connectFuture in internalConnect() to return the in-flight future instead of starting a new connect.

IProtoClientPoolImpl

  • Synchronize forEach() on connectionPoolLock to avoid CME under concurrent setGroups().

Test fixes

  • Wait for box.stat.net().CONNECTIONS.current to stabilise in BasePoolTest.getActiveConnectionsCount: the IProto worker updates it asynchronously, so a single read often lags by 5-15 connections when 20+ are opened in a burst.
  • Collapse the wait-for-stable Lua script to a single line so tarantool emits one YAML document (SnakeYAML rejects multi-document streams).
  • Wait for the active connection count to reach the expected value in ConnectionPoolReconnectsTest post-reconnect assertions via a new waitForActiveConnections() helper.

I haven't forgotten about:

  • Tests
  • Changelog
  • Documentation
    • JavaDoc was written
  • Commit messages comply with the guideline
  • Cleanup the code for review. See checklist

Related issues:

* <p>Also decrements count of unavailable clients and cancels reconnect task.
*/
public void unlock() {
public synchronized void unlock() {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Хорошо бы отразить здесь в комментарии, из каких потоков может быть вызован этот метод.

  1. Из потока пользовательского кода: при вызове функции изменения настроек обоймы подключений может вызываться unlock(), когда закрываются лишние). Не вызывается практически никогда. (Событие А).
  2. Из потока ввода/вывода Netty при обработке ответов на запросы IPROTO_PING (или другого метода для проверки): при анализе ответа может вызываться как lock, так и unlock. (Событие Б).
  3. Из потока ввода/вывода Netty при обработке события успешного подключения: вызывается unlock. (Событие В).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Здесь указанные события одновременно произойти не могут:

  1. Событием А можно пренебречь.
  2. Событие Б не может произойти одновременно с событием В, так как событие Б случается только после того, как успешно подключились и запустили задачу проверки соединения.

* <p>Also increments count of unavailable clients.
*/
public void lock() {
public synchronized void lock() {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Тоже давай отразим в комментарии, из каких потоков может быть вызван этот метод, и что может случиться:

  1. Из потока ввода/вывода Netty в случае, когда подключение не удаётся: вызывается handleConnectError. (обозначим как событие A).
  2. Из потока ввода/вывода Netty в случае, когда происходит обрыв подключения: также вызывается handleConnectError. (обозначим как событие Б).
  3. Из потока ввода/вывода Netty в случае, когда происходит анализ ответа на запрос PING и принимается решение заблокировать соединение, чтобы вывести его из распределения нагрузки. (обозначим как событие В).

Есть вероятность, что могут случиться одновременно события Б и В (решили вывести соединение из под нагрузки и тут же прилетает его завершение).2

В этом случае может увеличиться дважды счётчик unavailable.

Какие еще события могут произойти?

* with the Netty close-callback path, which takes the {@code ConnectionImpl} monitor first and
* then re-enters {@link #handleConnectError(Object, Throwable)} on the entry.
*/
public void shutdown() {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shutdown() также надо проанализировать, из каких потоков он может быть вызван.
Как минимум, из потока пользовательского кода (закрытие обоймы соединений) ии... каких еще?

return (Integer) result.get(0) - 1;
// box.stat.net().CONNECTIONS.current is updated asynchronously by the IProto worker;
// the loop's fiber.sleep lets it drain pending connections before we read.
String lua =

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Мне кажется, что именно эти изменения, вкупе с изменениями ниже (waitForActiveConnections) и дают наибольшй эффект, нежели попытки обмазать всё блокировками в PoolEntry

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Как минимум для Tarantool 3.5.0 это помогает.

* will not be returned to outer client.
*/
private boolean isLocked;
private volatile boolean isLocked;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Т.к. подключение в обойме может быть затронуто из нескольких потоков, для его блокировки и разблокировки, то вероятно, полезнее будет сделать isLocked как AtomicBoolean и переделать методы блокировки через него. Возможно, этого изменения будет достаточно

dkasimovskiy and others added 5 commits June 22, 2026 15:47
…oolEntry

PoolEntry state transitions had several races between netty IO threads,
the HashedWheelTimer heartbeat worker, and user-facing reconnect calls
that surfaced as ABBA deadlocks, NPEs on inline connect failures, and
leaked connections after a KILL/reconnect cycle in CI.

PoolEntry:
- Synchronize state mutations and mark connectFuture, heartbeatTask,
  reconnectTask, lastHeartbeatEvent, isLocked, and isShutdown
  volatile/AtomicBoolean for cross-thread visibility.
- Narrow the entry-monitor critical sections to field mutations; call
  client.close() and emit() outside the monitor to break the ABBA
  deadlock between ConnectionImpl and PoolEntry monitors that hung
  DistributingRoundRobinBalancerTest.
- Return the local connect future from internalConnect() so an inline
  connect failure cannot leave the caller observing a null
  connectFuture after handleConnectError() nulls it for reconnect.
- Keep client.close() in shutdown() on every invocation (closeChannel
  is idempotent if already closed) but guard only the
  onConnectionClosed emit, so a KILL-then-reconnect cycle cannot
  leak the new connection when its auth/ping subsequently fails.
- Serialize connectAfter() reconnect-task scheduling to avoid double
  scheduling; add a double-check of connectFuture in internalConnect()
  to return the in-flight future instead of starting a new connect.

IProtoClientPoolImpl:
- Synchronize forEach() on connectionPoolLock to avoid CME under
  concurrent setGroups().

Tests (BasePoolTest / ConnectionPoolReconnectsTest):
- Wait for box.stat.net().CONNECTIONS.current to stabilise in
  getActiveConnectionsCount: the IProto worker updates it
  asynchronously, so a single read often lags by 5-15 connections
  when 20+ are opened in a burst.
- Collapse the wait-for-stable Lua script to a single line so
  tarantool emits one YAML document (SnakeYAML rejects
  multi-document streams).
- Wait for the active connection count to reach the expected value
  in ConnectionPoolReconnectsTest post-reconnect assertions via a
  new waitForActiveConnections() helper.

Verified locally on 3.5.0 and 2.11.8: ConnectionPoolReconnectsTest,
ConnectionPoolTest, ConnectionPoolHeartbeatTest,
DistributingRoundRobinBalancerTest, and unit tests all pass
consistently where they were previously flaky.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
…oolEntry

Fix a relocated ABBA deadlock: `PoolEntry.connect()`/`internalConnect()` no longer hold the entry monitor across `client.connect()`, which would deadlock with the Netty close-callback path on the shared `ConnectionImpl` monitor during a close/reconnect overlap.
Fix `onConnectionClosed` accounting: the `PoolEntry` shutdown idempotency flag is now reset per connection generation, so a KILL/reconnect cycle emits one close event per generation instead of suppressing all closes after the first.
`IProtoClientPoolImpl.forEach()` snapshots clients under the pool lock and invokes the action outside it, avoiding `ConcurrentModificationException` under concurrent `setGroups()` without holding the lock across a user callback.
…oolEntry

Fix a relocated ABBA deadlock: `PoolEntry.connect()`/`internalConnect()` no longer hold the entry monitor across `client.connect()`, which would deadlock with the Netty close-callback path on the shared `ConnectionImpl` monitor during a close/reconnect overlap.
Fix `onConnectionClosed` accounting: the `PoolEntry` shutdown idempotency flag is now reset per connection generation, so a KILL/reconnect cycle emits one close event per generation instead of suppressing all closes after the first.
`IProtoClientPoolImpl.forEach()` snapshots clients under the pool lock and invokes the action outside it, avoiding `ConcurrentModificationException` under concurrent `setGroups()` without holding the lock across a user callback.
…threading

Make `isLocked` an `AtomicBoolean` and switch `lock()`/`unlock()` to
`compareAndSet` so the `unavailable` counter increments/decrements exactly
once per transition even when `ConnectionBreak` and `HeartbeatInvalidate`
race on the same entry.

Document every call site of `lock()`, `unlock()`, and `shutdown()` in
JavaDoc: the full event name, the thread it runs on, and (for the
`UserConfigChange` caller of `unlock()`) the lock it must hold to be safe.

Addresses the remaining review threads on PR #100 (PRRT_kwDOQchuhc6LMf1w,
PRRT_kwDOQchuhc6LMXld, PRRT_kwDOQchuhc6LMnIM).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@dkasimovskiy dkasimovskiy force-pushed the fix/connection-pool-flaky-tests branch 3 times, most recently from 1347025 to 3e37768 Compare June 22, 2026 15:56
Move log.info outside synchronized block in connectAfter method
@dkasimovskiy dkasimovskiy force-pushed the fix/connection-pool-flaky-tests branch from ab54164 to 7216e4c Compare June 23, 2026 12:45
@dkasimovskiy

dkasimovskiy commented Jun 23, 2026

Copy link
Copy Markdown
Contributor Author

Анализ многопоточных вызовов методов PoolEntry, изменённых в ветке

Контекст

В ветке fix/connection-pool-flaky-tests относительно master изменены следующие методы
tarantool-pooling/src/main/java/io/tarantool/pool/PoolEntry.java:

Метод Изменение Что даёт
lock() (L332) synchronized + volatile boolean isLockedAtomicBoolean isLocked + compareAndSet(false, true) lock-free; ровно один unavailable.increment за false → true
unlock() (L363) synchronized + volatile boolean isLockedAtomicBoolean isLocked + compareAndSet(true, false) lock-free; ровно один unavailable.decrement + stopReconnectTask за true → false
isLocked() (L375) return isLocked;return isLocked.get(); идемпотентный геттер
connect() (L432) synchronized → обычный развязка с монитором internalConnect (тот использует короткие synchronized блоки вокруг connectFuture)
internalConnect() (L487) один synchronized метод → два коротких synchronized блока вокруг connectFuture client.connect() крутится без удержания entry-монитора → развязка с client.close() в shutdown (ABBA)
connectAfter() (L568) log.info вынесен из-под synchronized; synchronized блок у́же лог больше не под локом; counter-логика та же
stopReconnectTask() (L749) без изменений тела, остался synchronized
shutdown() (L412) synchronized { connectFuture = null; stopHeartbeat(); } (без изменений) + isShutdown.compareAndSet на emit монитор по-прежнему отпускается до client.close() (ABBA); emit — lock-free one-shot
handleConnectError() (L552) synchronized { connectFuture = null; } (без изменений)
close() (L380) без изменений
fire() (L664) без изменений, но lock/unlock/shutdown стали CAS/lock-free

Поля: isLocked, isShutdownAtomicBoolean; connectFuture, reconnectTask
обычные ссылки под synchronized (this). JavaDoc на lock/unlock/shutdown расширен
полными именами событий и пометками о потоках — это уже сделано, ниже только анализ.

Участники многопоточной системы

Поток Обозначение Чем занят Как попадает в PoolEntry
Пользовательский поток T_user Код приложения: pool.acquire(), pool.close(), pool.setGroups(...), pool.forEach(...) Через публичный API IProtoClientPoolImpl, который сам сериализует мутации на connectionPoolLock
Netty IO поток T_io Обработка ответов IProto, доставка close-колбэков Через client.onClose(CLOSE_BY_REMOTE/SHUTDOWN, this::handleConnectError) и через whenComplete колбэки на CompletableFuture от IProto-запросов (connect, authorize, ping)
HashedWheelTimer поток T_timer Срабатывание Timeout-ов: heartbeat-ping и reconnect-задачи Через timerService.newTimeout(this::ping, ...) и timerService.newTimeout(timeout -> internalConnect(), ...)

Каждый PoolEntry делит между этими потоками одно и то же состояние (счётчики,
флаги, future-ы, таймеры). Координация — три механизма, у каждого своя роль:

  1. AtomicReference-подобные CAS / AtomicBoolean (isLocked, isShutdown) —
    одно-полевые lock-free переходы состояния, выдерживают гонки между
    T_io и T_timer без блокировок.
  2. synchronized (this) блоки вокруг connectFuture и reconnectTask — это
    много-полевые согласованные мутации (прочитать-проверить-установить или
    set(null) вместе с stopHeartbeat()). Удерживаются коротко и нигде не
    перекрывают I/O-операции (client.connect, client.close).
  3. synchronized в IProtoClientPoolImpl (connectionPoolLock) — сериализует
    мутации пула со стороны пользовательского кода.

Покомандная карта вызовов

lock()

Источник Поток Событие
handleConnectError(r, exc) L562 T_io (Netty close listener) ConnectFailure или ConnectionBreak
fire(HeartbeatEvent.INVALIDATE) L673 T_io (heartbeat pong whenComplete) HeartbeatInvalidate

CAS на isLocked гарантирует: ровно один unavailable.incrementAndGet,
проигравший — no-op.

sequenceDiagram
    autonumber
    participant N1 as T_io #1<br/>(close callback)
    participant N2 as T_io #2<br/>(heartbeat pong)
    participant E as PoolEntry
    participant C as unavailable

    par Close path
        N1->>E: handleConnectError → lock()
    and Heartbeat path
        N2->>E: pong → fire(INVALIDATE) → lock()
    end

    Note over E: isLocked.compareAndSet(false, true)
    E-->>N1: true (winner) | false (loser)
    E-->>N2: true (winner) | false (loser)
    alt winner
        E->>C: incrementAndGet()
    else loser
        E--xC: no-op
    end
Loading

unlock()

Источник Поток Событие
fire(HeartbeatEvent.ACTIVATE) L677 T_io (heartbeat pong) HeartbeatResponse
onConnectComplete(r, exc) L541 T_io (connect whenComplete) ConnectSuccess
IProtoClientPoolImpl.shrinkGroup (внешний) T_user, удерживает connectionPoolLock UserConfigChange

HeartbeatResponse/ConnectSuccess идут на запертую запись (heartbeat стартует
после коннекта, HeartbeatResponse — только после HeartbeatInvalidate); гонки
T_ioT_io решает CAS. UserConfigChange сериализован с T_user через
connectionPoolLock, с T_io может пересечься только в окне
unlock → close.

sequenceDiagram
    autonumber
    participant U as T_user<br/>(setGroups)
    participant N1 as T_io #1<br/>(pong)
    participant E as PoolEntry
    participant P as IProtoClientPoolImpl

    Note over U,P: U holds connectionPoolLock
    U->>P: shrinkGroup() walks entries
    loop each dropped entry
        U->>E: entry.unlock() — UserConfigChange
    end

    par concurrent
        N1->>E: pong → fire(ACTIVATE) → unlock() — HeartbeatResponse
    end

    Note over E: isLocked.compareAndSet(true, false)
    E-->>U: true | false
    E-->>N1: true | false
    Note over E: only one wins,<br/>loser no-ops
Loading

shutdown()

Источник Поток Событие
close() L382 T_user (pool.close()) PoolClose
handleConnectError(r, exc) L563 T_io (Netty close listener) ConnectError (включает ConnectFailure + ConnectionBreak)
fire(HeartbeatEvent.KILL) L681 T_io (heartbeat pong) HeartbeatKill

Три фазы с разными гарантиями — synchronize полей, I/O под ConnectionImpl-монитором, lock-free emit:

flowchart TD
    A[shutdown entry] --> B["synchronized (this) { connectFuture = null; stopHeartbeat(); }"]
    B --> C["client.close() —<br/>acquires ConnectionImpl monitor"]
    C --> D["isShutdown.compareAndSet(false, true)<br/>— lock-free one-shot"]
    D -->|won| E[emit onConnectionClosed]
    D -->|lost| F[no-op]
Loading

internalConnect()

Источник Поток Событие
connect() L433 T_user (публичный API) Acquire
Reconnect-таска из connectAfter T_timer ReconnectFired
sequenceDiagram
    autonumber
    participant U as T_user
    participant T as T_timer
    participant E as PoolEntry
    participant C as IProtoClient

    rect rgb(245, 245, 245)
    Note over E: 1st sync block — fast path
    U->>E: internalConnect()
    T->>E: internalConnect() (reconnect task)
    E->>E: synchronized: if (connectFuture != null) return it
    end

    Note over U,T: Both pass: connectFuture is null
    U->>C: client.connect() — outside lock
    T->>C: client.connect() — outside lock

    rect rgb(245, 245, 245)
    Note over E: 2nd sync block — install winner
    E->>E: synchronized: if (cf already installed) return winner's,<br/> else install mine
    end

    alt winner
        E->>E: isShutdown.set(false)
        E->>E: cf.whenComplete(onConnectComplete)
        E-->>winner: return cf
    else loser
        E-->>loser: return winner's cf
        Note over E: loser's cf silently completes,<br/> onConnectComplete fires once on winner's path
    end
Loading

Двойной synchronized даёт race-free установку winner-а без удержания entry-монитора
через сетевой client.connect() — иначе T_user/T_timer выстраивались бы в
очередь на время коннекта.

connectAfter()

Источник Поток Событие
handleConnectError(r, exc) L564 T_io (close listener) ConnectError
fire(HeartbeatEvent.KILL) L683 T_io (heartbeat pong) HeartbeatKill

Тело:

flowchart TD
    A[connectAfter entry] --> B[log.info — outside lock]
    B --> C["newTask = timerService.newTimeout(...) — outside lock"]
    C --> D["synchronized (this) { old = reconnectTask.getAndSet(newTask) ... }"]
    D -->|old == null| E["reconnecting.incrementAndGet()"]
    D -->|old != null| F["old.cancel() —<br/>counter stays (the increment carries over)"]
    E --> G[emit onReconnectScheduled]
    F --> G
Loading

log.info и newTimeout — вне лока. Counter-семантика: первый шедул +1,
замена переносит старый инкремент, stopReconnectTask-1.

stopReconnectTask()

Источник Поток Событие
close() L381 T_user PoolClose
unlock() L365 (когда CAS выигран) T_user (UserConfigChange) или T_io (HeartbeatResponse/ConnectSuccess) зависит от пути

Тело — атомарный test-and-clear под synchronized (this):

  • reconnectTask != null: reconnecting.decrementAndGet(); reconnectTask.cancel(); reconnectTask = null;
  • reconnectTask == null: no-op.

Сериализует гонку T_user.closeT_io.unlock-CAS-winner ↔
T_timer-вызванный connectAfter (см. ниже раздел про переплетение путей).

handleConnectError()

Источник Поток Событие
Netty close listener CLOSE_BY_REMOTE T_io ConnectionBreak
Netty close listener CLOSE_BY_SHUTDOWN T_io ConnectFailure или ConnectionBreak

Только T_io, но T_io у разных коннектов — разные потоки, поэтому
конкуренция в synchronized блоке за connectFuture реальна. После
зануления connectFuture идёт не-защищённая последовательность:
emitlock (CAS) → shutdown (synchronized блок) → connectAfter
(synchronized блок). Каждый из этих шагов самодостаточен по синхронизации,
но они не атомарны как единая последовательность — между ними может
вклиниться другой T_io или T_user.

close()

Источник Поток Событие
IProtoClientPoolImpl.close() T_user PoolClose

stopReconnectTask() + shutdown(). Оба под локом / с защитой; T_io
может параллельно прислать close-callback в handleConnectError, что
приведёт ко второму shutdown() (см. ABBA-развязку выше).

Зависимости между потоками

Граф «кто кого вызывает» (стрелка = «вызывает метод, который исполняется
на»):

flowchart LR
    user["T_user<br/>(pool operations)"]
    io1["T_io #1<br/>(close listener)"]
    io2["T_io #2<br/>(connect/ping whenComplete)"]
    timer["T_timer<br/>(HashedWheelTimer)"]

    user -->|connect| internalConnect
    user -->|"unlock(shrinkGroup)"| unlock
    user -->|close| shutdown

    io1 -->|handleConnectError| lock
    io1 -->|handleConnectError| shutdown
    io1 -->|handleConnectError| connectAfter

    io2 -->|pong → fire| lock
    io2 -->|pong → fire| unlock
    io2 -->|pong → fire| shutdown
    io2 -->|onConnectComplete| unlock

    timer -->|ping fires| io2
    timer -->|reconnect task| internalConnect
    internalConnect -.allocates.-> io2
    connectAfter -.schedules.-> timer
Loading

Кто кого ждёт (прямые блокировки):

  • T_user → T_io: T_user не блокируется на T_io в connect(). Возвращённый
    CompletableFuture — асинхронный; T_user сам решает, делать ли
    .get() или .whenComplete. Это обратное давление на пользователя,
    а не блокировка внутри PoolEntry.
  • T_io → T_timer: не блокируется. connectAfter шедулит задачу и сразу
    возвращается. pong тоже не ждёт — он только nextPing() шедулит.
  • T_timer → T_io: не блокируется. ping() отправляет запрос, регистрирует
    whenComplete и возвращается. Ответ обработается на T_io асинхронно.
  • T_user → T_user: пользовательские мутации (setGroups, close)
    сериализуются на connectionPoolLock внутри IProtoClientPoolImpl.
    Acquire/release этого лока не берут.

Сетевая развязка (нет блокировок через сеть):

  • client.connect(...)internalConnect) исполняется вне synchronized.
    Это ключевое отличие от старой монолитной synchronized обёртки: параллельные
    T_user/T_timer не выстраиваются в очередь на время коннекта.
  • client.close()shutdown) исполняется вне synchronized. Entry-монитор
    нужен только чтобы сериализовать зануление connectFuture и stopHeartbeat;
    дальше client.close() берёт ConnectionImpl-монитор сам, и мы обязаны
    к этому моменту отпустить entry-монитор, иначе ABBA с Netty close-callback.

Lock-ordering: Netty close-callback берёт ConnectionImpl → entry; shutdown
берёт entry → отпускает → ConnectionImpl. Один глобальный порядок, без циклов.

@dkasimovskiy dkasimovskiy marked this pull request as draft June 25, 2026 08:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants