Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
- Document supported Java types for Tarantool data mapping in `tuple_pojo_mapping` docs (RU/EN), including Tarantool extension types (`decimal`, `uuid`, `datetime`, `interval`, `tuple`) and related mapping notes.
- Document Jackson MsgPack deserialization: integers, `bin`/`str` vs `byte[]`/`String`, floating-point vs `decimal`; reference `jackson-dataformat-msgpack` for defaults and type coercion.

### Pooling

- Fix race conditions, ABBA deadlock between `PoolEntry` and `ConnectionImpl` monitors, NPE on inline connect failure, and connection leak after a KILL/reconnect cycle in `PoolEntry` and `IProtoClientPoolImpl`.

### Dependencies
- Updated dependencies:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -524,11 +524,15 @@ public void setReconnectAfter(long reconnectAfter) throws IllegalArgumentExcepti

@Override
public void forEach(Consumer<IProtoClient> action) {
for (List<PoolEntry> group : entries.values()) {
for (PoolEntry entry : group) {
action.accept(entry.getClient());
List<IProtoClient> clients = new ArrayList<>();
synchronized (connectionPoolLock) {
for (List<PoolEntry> group : entries.values()) {
for (PoolEntry entry : group) {
clients.add(entry.getClient());
}
}
}
clients.forEach(action);
}

/**
Expand Down
158 changes: 125 additions & 33 deletions tarantool-pooling/src/main/java/io/tarantool/pool/PoolEntry.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.ArrayDeque;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Consumer;
Expand Down Expand Up @@ -146,24 +147,30 @@ final class PoolEntry {
private CompletableFuture<IProtoClient> connectFuture;

/** Last heartbeat state/event. */
private HeartbeatEvent lastHeartbeatEvent;
private volatile HeartbeatEvent lastHeartbeatEvent;

/** Heartbeat timer/task. */
private Timeout heartbeatTask;
private volatile Timeout heartbeatTask;

/** Reconnection task. */
private Timeout reconnectTask;

/** Flag signaling if heartbeat started or not. */
private boolean isHeartbeatStarted;
private volatile boolean isHeartbeatStarted;

/**
* Flag signaling if connection is available or not.
*
* <p>When connection comes to invalidated state or killed, pool entry is locked and connection
* will not be returned to outer client.
*/
private boolean isLocked;
private final AtomicBoolean isLocked = new AtomicBoolean(false);

/**
* Per-generation idempotency flag for {@link #shutdown()} close-event emit; reset in {@link
* #internalConnect()} when a new connection generation begins.
*/
private final AtomicBoolean isShutdown = new AtomicBoolean(false);

/** Count of failed pings occurred in invalidated state. */
private int currentDeathPings;
Expand Down Expand Up @@ -273,7 +280,6 @@ public PoolEntry(
this.gracefulShutdown = gracefulShutdown;
this.group = group;
this.index = index;
this.isLocked = false;
this.reconnectAfter = reconnectAfter;
this.tag = group.getTag();
this.timerService = timerService;
Expand Down Expand Up @@ -301,27 +307,63 @@ public IProtoClient getClient() {
}

/**
* Method for locking pool entry.
* Locks the pool entry and increments the pool-wide {@code unavailable} counter on the {@code
* false → true} transition of {@link #isLocked}.
*
* <p>Also increments count of unavailable clients.
* <p>All callers run on Netty IO threads:
*
* <ul>
* <li>{@code ConnectFailure} — initial connect attempt fails; reaches this method via {@link
* #handleConnectError(Object, Throwable)} from the {@code CLOSE_BY_REMOTE} or {@code
* CLOSE_BY_SHUTDOWN} listener registered on {@link #client}.
* <li>{@code ConnectionBreak} — an established connection is closed by the remote side or shut
* down locally; same path as {@code ConnectFailure}.
* <li>{@code HeartbeatInvalidate} — the sliding-window failure rate in {@link #pong} crosses
* the invalidation threshold; reaches this method via {@link #fire(HeartbeatEvent)} for
* {@code INVALIDATE}.
* </ul>
*
* <p>{@code ConnectionBreak} and {@code HeartbeatInvalidate} can fire concurrently (the heartbeat
* decides to invalidate the connection at the same instant the close callback runs). The {@link
* AtomicBoolean#compareAndSet} on {@link #isLocked} ensures only one of the two callers wins the
* {@code false → true} transition, so {@code unavailable} is incremented at most once per lock
* acquisition.
*/
public void lock() {
if (!isLocked) {
if (isLocked.compareAndSet(false, true)) {
unavailable.incrementAndGet();
isLocked = true;
}
}

/**
* Method for unlocking pool entry.
* Unlocks the pool entry, cancels any pending reconnect task, and decrements the pool-wide {@code
* unavailable} counter on the {@code true → false} transition of {@link #isLocked}.
*
* <p>Callers:
*
* <p>Also decrements count of unavailable clients and cancels reconnect task.
* <ul>
* <li>{@code HeartbeatResponse} — Netty IO thread, {@link #pong} reports a healthy response and
* {@link #fire(HeartbeatEvent)} for {@code ACTIVATE} reaches this method.
* <li>{@code ConnectSuccess} — Netty IO thread, {@link #onConnectComplete} reaches this method
* after a successful connect (and {@link #startHeartbeat}).
* <li>{@code UserConfigChange} — user code, when {@code pool.setGroups(...)} shrinks a group.
* Runs on the user thread while it holds {@code connectionPoolLock} (see {@code
* IProtoClientPoolImpl#shrinkGroup}); that lock serialises this path against other pool
* mutators, which is the only reason it is safe for user code to call {@code unlock()}
* directly — no other user-code path reaches this method. Rare in practice; not driven by
* Netty.
* </ul>
*
* <p>{@code HeartbeatResponse} and {@code ConnectSuccess} cannot run at the same time for the
* same entry: the heartbeat is started only after a successful connect, and {@code
* HeartbeatResponse} only follows a prior {@code HeartbeatInvalidate} that already locked the
* entry. The {@link AtomicBoolean#compareAndSet} ensures only one of possibly several concurrent
* unlockers runs {@link #stopReconnectTask} and decrements {@code unavailable}.
*/
public void unlock() {
if (isLocked) {
if (isLocked.compareAndSet(true, false)) {
stopReconnectTask();
unavailable.decrementAndGet();
isLocked = false;
}
}

Expand All @@ -331,7 +373,7 @@ public void unlock() {
* @return {@link #isLocked} value.
*/
public boolean isLocked() {
return isLocked;
return isLocked.get();
}

/** Closes client and stops heartbeat and reconnect tasks if started. */
Expand All @@ -340,27 +382,54 @@ public void close() {
shutdown();
}

/** Closes client and stops heartbeat task is started. */
/**
* Closes the underlying client and stops the heartbeat task.
*
* <p>Performs field mutations under the entry monitor, then releases it before calling {@code
* client.close()} (which acquires the {@code ConnectionImpl} monitor) and emitting the close
* event. Holding the entry monitor across either of those calls would create an ABBA deadlock
* with the Netty close-callback path, which takes the {@code ConnectionImpl} monitor first and
* then re-enters {@link #handleConnectError(Object, Throwable)} on the entry.
*
* <p>Callers:
*
* <ul>
* <li>{@code PoolClose} — user code, via {@code pool.close()} → {@link #close()} → this method.
* Runs on the user thread that closes the pool.
* <li>{@code ConnectError} — Netty IO thread, via {@link #handleConnectError(Object,
* Throwable)} which fires for both {@code ConnectFailure} (initial connect fails) and
* {@code ConnectionBreak} (established connection drops). Runs on the Netty IO thread
* delivering the close event.
* <li>{@code HeartbeatKill} — Netty IO thread, via {@link #fire(HeartbeatEvent)} for {@code
* KILL} when the death-ping counter crosses the death threshold. Runs on the Netty IO
* thread processing the heartbeat pong.
* </ul>
*
* <p>The {@code isShutdown} {@link AtomicBoolean} guard on the {@code onConnectionClosed} emit
* makes the listener invocation one-shot regardless of how many of the above paths reach this
* method for the same entry.
*/
public void shutdown() {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shutdown() также надо проанализировать, из каких потоков он может быть вызван.
Как минимум, из потока пользовательского кода (закрытие обоймы соединений) ии... каких еще?

connectFuture = null;
stopHeartbeat();
synchronized (this) {
connectFuture = null;
stopHeartbeat();
}
try {
client.close();
} catch (Exception e) {
log.warn("Cannot close client in pool", e);
}
emit(listener -> listener.onConnectionClosed(tag, index));
if (isShutdown.compareAndSet(false, true)) {
emit(listener -> listener.onConnectionClosed(tag, index));
}
}

/**
* Start client connection process and returns futures.
*
* @return {@link java.util.concurrent.CompletableFuture} with client
*/
public synchronized CompletableFuture<IProtoClient> connect() {
if (connectFuture != null) {
return connectFuture;
}
public CompletableFuture<IProtoClient> connect() {
return internalConnect();
}

Expand Down Expand Up @@ -410,15 +479,23 @@ public void stopHeartbeat() {
/**
* Internal method used by reconnect task and public connect.
*
* <p>See {@link #shutdown()} for the monitor-ordering reasoning; {@code client.connect()} runs
* outside the entry monitor for the same reason.
*
* @return {@link java.util.concurrent.CompletableFuture} with client
*/
private CompletableFuture<IProtoClient> internalConnect() {
synchronized (this) {
if (connectFuture != null) {
return connectFuture;
}
}
log.info("connect {}/{}", tag, index);
LongTaskTimer.Sample timer = startTimer(connectTime);
CompletableFuture<?> future =
client.connect(group.getAddress(), connectTimeout, gracefulShutdown);
String user = group.getUser();
connectFuture =
CompletableFuture<IProtoClient> cf =
future
.thenCompose(
greeting -> {
Expand All @@ -428,9 +505,16 @@ private CompletableFuture<IProtoClient> internalConnect() {
}
return client.ping(firstPingOpts);
})
.thenApply(r -> client)
.whenComplete(this::onConnectComplete);
return connectFuture;
.thenApply(r -> client);
synchronized (this) {
if (connectFuture != null) {
return connectFuture;
}
connectFuture = cf;
isShutdown.set(false);
}
cf.whenComplete(this::onConnectComplete);
return cf;
}

/**
Expand Down Expand Up @@ -470,7 +554,9 @@ private void handleConnectError(Object r, Throwable exc) {
return;
}
Throwable failure = exc.getCause() != null ? exc.getCause() : exc;
connectFuture = null;
synchronized (this) {
connectFuture = null;
}
log.error("connect error {}/{}: {}", tag, index, failure.toString());
emit(listener -> listener.onConnectionFailed(tag, index, failure));
lock();
Expand All @@ -481,12 +567,18 @@ private void handleConnectError(Object r, Throwable exc) {
/** Reconnect task scheduler. */
private void connectAfter() {
log.info("reconnect {}/{} after {} ms", tag, index, reconnectAfter);
if (reconnectTask == null) {
reconnecting.incrementAndGet();
synchronized (this) {
if (reconnectTask != null) {
// existing task is being replaced; the existing increment in `reconnecting` carries over
// to the new task, so no counter change is needed here.
reconnectTask.cancel();
} else {
reconnecting.incrementAndGet();
}
reconnectTask =
timerService.newTimeout(
timeout -> internalConnect(), reconnectAfter, TimeUnit.MILLISECONDS);
}
reconnectTask =
timerService.newTimeout(
timeout -> internalConnect(), reconnectAfter, TimeUnit.MILLISECONDS);
emit(listener -> listener.onReconnectScheduled(tag, index, reconnectAfter));
}

Expand Down Expand Up @@ -658,7 +750,7 @@ private void incHeartbeatCounters(int fail) {
}

/** Stops reconnecting task if it is active. */
private void stopReconnectTask() {
private synchronized void stopReconnectTask() {
if (reconnectTask != null) {
reconnecting.decrementAndGet();
reconnectTask.cancel();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.concurrent.CompletionException;
import java.util.concurrent.ThreadLocalRandom;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import io.micrometer.core.instrument.Counter;
Expand Down Expand Up @@ -93,10 +94,19 @@ protected void execLua(TarantoolContainer<?> container, String command) {

protected int getActiveConnectionsCount(TarantoolContainer<?> tt) {
try {
List<? extends Object> result =
TarantoolContainerClientHelper.executeCommandDecoded(
tt, "return box.stat.net().CONNECTIONS.current");
return (Integer) result.get(0) - 1;
// box.stat.net().CONNECTIONS.current is updated asynchronously by the IProto worker;
// the loop's fiber.sleep lets it drain pending connections before we read.
String lua =
"local last = box.stat.net().CONNECTIONS.current;"
+ " for i = 1, 50 do"
+ " require('fiber').sleep(0.05);"
+ " local cur = box.stat.net().CONNECTIONS.current;"
+ " if cur == last then return cur - 1 end;"
+ " last = cur;"
+ " end;"
+ " return last - 1";
List<? extends Object> result = TarantoolContainerClientHelper.executeCommandDecoded(tt, lua);
return (Integer) result.get(0);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand All @@ -106,6 +116,24 @@ protected int getActiveConnectionsCountDelta(TarantoolContainer<?> tt, int basel
return getActiveConnectionsCount(tt) - baseline;
}

/**
* Retries {@link #getActiveConnectionsCount} until it equals {@code expected} — see there for why
* a single read is unreliable.
*
* @param tt the Tarantool container under test
* @param expected the expected number of active connections
*/
protected void waitForActiveConnections(TarantoolContainer<?> tt, int expected) {
try {
waitFor(
"Active connections count never reached " + expected,
Duration.ofSeconds(10),
() -> assertEquals(expected, getActiveConnectionsCount(tt)));
} catch (Exception e) {
throw new RuntimeException(e);
}
}

protected MeterRegistry createMetricsRegistry() {
MeterRegistry metricsRegistry = new SimpleMeterRegistry();
LongTaskTimer.builder("request.timer")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public void testReconnectAfterNodeFailure() throws Exception {
assertTrue(pool.hasAvailableClients());
List<IProtoClient> clients = getConnects(pool, "node-a", count1);
assertTrue(pingClients(clients));
assertEquals(count1, getActiveConnectionsCount(tt));
waitForActiveConnections(tt, count1);

tt.stop();
Thread.sleep(1000);
Expand Down Expand Up @@ -110,7 +110,7 @@ public void testReconnectAfterNodeFailure() throws Exception {
});

assertTrue(pingClients(clients));
assertEquals(count1, getActiveConnectionsCount(tt));
waitForActiveConnections(tt, count1);

assertEquals(count1, metricsRegistry.get("pool.size").gauge().value());
assertEquals(count1, metricsRegistry.get("pool.available").gauge().value());
Expand Down
Loading