From 1cfd4d46e940035e51be79e7af60f4981711fe5d Mon Sep 17 00:00:00 2001 From: Kenneth VanderLinde Date: Fri, 12 Jun 2026 13:19:31 -0700 Subject: [PATCH 1/3] Require AbstractConnection to be initialized with an id --- .../simple/connection/AbstractConnection.java | 10 ++++++++++ .../simple/connection/DirectConnection.java | 8 +------- .../simple/connection/SocketConnection.java | 10 ++-------- .../simple/connection/WebRTCConnection.java | 10 ++-------- 4 files changed, 15 insertions(+), 23 deletions(-) diff --git a/clientserver/src/main/java/net/rptools/clientserver/simple/connection/AbstractConnection.java b/clientserver/src/main/java/net/rptools/clientserver/simple/connection/AbstractConnection.java index 4e7fb134ed..fd4033d96b 100644 --- a/clientserver/src/main/java/net/rptools/clientserver/simple/connection/AbstractConnection.java +++ b/clientserver/src/main/java/net/rptools/clientserver/simple/connection/AbstractConnection.java @@ -33,6 +33,7 @@ public abstract class AbstractConnection implements Connection { private static final Logger log = LogManager.getLogger(AbstractConnection.class); + private final String id; private final AtomicBoolean closed = new AtomicBoolean(false); private final BlockingQueue outQueue = new LinkedBlockingQueue<>(); @@ -40,6 +41,15 @@ public abstract class AbstractConnection implements Connection { private final List listeners = new CopyOnWriteArrayList<>(); private final List messageHandlers = new CopyOnWriteArrayList<>(); + protected AbstractConnection(String id) { + this.id = id; + } + + @Override + public final String getId() { + return id; + } + @Override public final void close() { if (closed.compareAndSet(false, true)) { diff --git a/clientserver/src/main/java/net/rptools/clientserver/simple/connection/DirectConnection.java b/clientserver/src/main/java/net/rptools/clientserver/simple/connection/DirectConnection.java index 5b589c46b8..a0f1210883 100644 --- a/clientserver/src/main/java/net/rptools/clientserver/simple/connection/DirectConnection.java +++ b/clientserver/src/main/java/net/rptools/clientserver/simple/connection/DirectConnection.java @@ -40,7 +40,6 @@ public static Pair create(String id) { } private final AtomicBoolean sharedClosedFlag; - private final String id; private final BlockingQueue writeQueue; private final ReceiveThread receiveThread; @@ -49,8 +48,8 @@ private DirectConnection( String id, BlockingQueue writeQueue, BlockingQueue readQueue) { + super(id); this.sharedClosedFlag = sharedClosedFlag; - this.id = id; this.writeQueue = writeQueue; this.receiveThread = new ReceiveThread(readQueue); } @@ -89,11 +88,6 @@ public boolean isAlive() { return !sharedClosedFlag.get(); } - @Override - public String getId() { - return id; - } - @Override public String getError() { return null; diff --git a/clientserver/src/main/java/net/rptools/clientserver/simple/connection/SocketConnection.java b/clientserver/src/main/java/net/rptools/clientserver/simple/connection/SocketConnection.java index 42124691c5..ef854c76d2 100644 --- a/clientserver/src/main/java/net/rptools/clientserver/simple/connection/SocketConnection.java +++ b/clientserver/src/main/java/net/rptools/clientserver/simple/connection/SocketConnection.java @@ -27,7 +27,6 @@ public class SocketConnection extends AbstractConnection implements Connection { /** Instance used for log messages. */ private static final Logger log = LogManager.getLogger(SocketConnection.class); - private final String id; private SendThread send; private ReceiveThread receive; private Socket socket; @@ -35,23 +34,18 @@ public class SocketConnection extends AbstractConnection implements Connection { private int port; public SocketConnection(String id, String hostName, int port) { - this.id = id; + super(id); this.hostName = hostName; this.port = port; } public SocketConnection(String id, Socket socket) { - this.id = id; + super(id); this.socket = socket; initialize(socket); } - @Override - public String getId() { - return id; - } - private void initialize(Socket socket) { this.socket = socket; this.send = new SendThread(socket); diff --git a/clientserver/src/main/java/net/rptools/clientserver/simple/connection/WebRTCConnection.java b/clientserver/src/main/java/net/rptools/clientserver/simple/connection/WebRTCConnection.java index 124c6d732b..4c1eb5cb43 100644 --- a/clientserver/src/main/java/net/rptools/clientserver/simple/connection/WebRTCConnection.java +++ b/clientserver/src/main/java/net/rptools/clientserver/simple/connection/WebRTCConnection.java @@ -39,7 +39,6 @@ public interface Listener { private final RTCDataChannelObserver rtcDataChannelObserver = new RTCDataChannelObserverImpl(); private final PeerConnectionFactory factory = new PeerConnectionFactory(); private final String serverName; - private final String id; private final Gson gson = new Gson(); private final Listener listener; private WebSocketClient signalingClient; @@ -57,7 +56,7 @@ public interface Listener { // used from client side public WebRTCConnection(String id, String serverName, Listener listener) { - this.id = id; + super(id); this.serverName = serverName; this.listener = listener; init(); @@ -65,7 +64,7 @@ public WebRTCConnection(String id, String serverName, Listener listener) { // this is used from the server side public WebRTCConnection(OfferMessageDto message, WebRTCServer webRTCServer) { - this.id = message.source; + super(message.source); this.server = webRTCServer; this.serverName = server.getName(); this.listener = () -> {}; @@ -225,11 +224,6 @@ public boolean isAlive() { }; } - @Override - public String getId() { - return id; - } - @Override public void open() throws IOException { startSignaling(); From 34d206c877bf483048f9f55393b10009b37bf8e7 Mon Sep 17 00:00:00 2001 From: Kenneth VanderLinde Date: Mon, 15 Jun 2026 12:57:11 -0700 Subject: [PATCH 2/3] Move AbstractConnection#readMessage() and #writeMessage() into SocketConnection and WebRTCConnection as each is only used in one place --- .../simple/connection/AbstractConnection.java | 90 ------------------- .../simple/connection/SocketConnection.java | 55 +++++++++++- .../simple/connection/WebRTCConnection.java | 41 +++++++++ 3 files changed, 94 insertions(+), 92 deletions(-) diff --git a/clientserver/src/main/java/net/rptools/clientserver/simple/connection/AbstractConnection.java b/clientserver/src/main/java/net/rptools/clientserver/simple/connection/AbstractConnection.java index fd4033d96b..978399dec7 100644 --- a/clientserver/src/main/java/net/rptools/clientserver/simple/connection/AbstractConnection.java +++ b/clientserver/src/main/java/net/rptools/clientserver/simple/connection/AbstractConnection.java @@ -15,7 +15,6 @@ package net.rptools.clientserver.simple.connection; import java.io.*; -import java.nio.ByteBuffer; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CopyOnWriteArrayList; @@ -125,95 +124,6 @@ protected final void dispatchCompressedMessage(byte[] compressedMessage) { dispatchMessage(message); } - protected final void writeMessage(OutputStream out, byte[] message) throws IOException { - int length = message.length; - - notifyListeners(ActivityListener.Direction.Outbound, ActivityListener.State.Start, length, 0); - - out.write(length >> 24); - out.write(length >> 16); - out.write(length >> 8); - out.write(length); - - for (int i = 0; i < message.length; i++) { - out.write(message[i]); - - if (i != 0 && i % ActivityListener.CHUNK_SIZE == 0) { - notifyListeners( - ActivityListener.Direction.Outbound, ActivityListener.State.Progress, length, i); - } - } - out.flush(); - notifyListeners( - ActivityListener.Direction.Outbound, ActivityListener.State.Complete, length, length); - } - - protected final byte[] readMessage(InputStream in) throws IOException { - int b32 = in.read(); - int b24 = in.read(); - int b16 = in.read(); - int b8 = in.read(); - - if (b32 < 0) { - throw new IOException("Stream closed"); - } - int length = (b32 << 24) + (b24 << 16) + (b16 << 8) + b8; - - notifyListeners(ActivityListener.Direction.Inbound, ActivityListener.State.Start, length, 0); - - byte[] ret = new byte[length]; - for (int i = 0; i < length; i++) { - ret[i] = (byte) in.read(); - - if (i != 0 && i % ActivityListener.CHUNK_SIZE == 0) { - notifyListeners( - ActivityListener.Direction.Inbound, ActivityListener.State.Progress, length, i); - } - } - notifyListeners( - ActivityListener.Direction.Inbound, ActivityListener.State.Complete, length, length); - return ret; - } - - private ByteBuffer messageBuffer = null; - - protected final byte[] readMessage(ByteBuffer part) { - if (messageBuffer == null) { - int length = part.getInt(); - notifyListeners(ActivityListener.Direction.Inbound, ActivityListener.State.Start, length, 0); - - if (part.remaining() == length) { - var ret = new byte[length]; - part.get(ret); - notifyListeners( - ActivityListener.Direction.Inbound, ActivityListener.State.Complete, length, length); - return ret; - } - - messageBuffer = ByteBuffer.allocate(length); - } - - messageBuffer.put(part); - notifyListeners( - ActivityListener.Direction.Inbound, - ActivityListener.State.Progress, - messageBuffer.capacity(), - messageBuffer.position()); - - if (messageBuffer.capacity() == messageBuffer.position()) { - notifyListeners( - ActivityListener.Direction.Inbound, - ActivityListener.State.Complete, - messageBuffer.capacity(), - messageBuffer.capacity()); - var ret = messageBuffer.array(); - messageBuffer = null; - return ret; - } - - return null; - } - protected final void fireDisconnect() { for (DisconnectHandler handler : disconnectHandlers) { handler.handleDisconnect(this); diff --git a/clientserver/src/main/java/net/rptools/clientserver/simple/connection/SocketConnection.java b/clientserver/src/main/java/net/rptools/clientserver/simple/connection/SocketConnection.java index ef854c76d2..4a39b0e192 100644 --- a/clientserver/src/main/java/net/rptools/clientserver/simple/connection/SocketConnection.java +++ b/clientserver/src/main/java/net/rptools/clientserver/simple/connection/SocketConnection.java @@ -17,6 +17,7 @@ import java.io.*; import java.net.Socket; import java.net.SocketTimeoutException; +import net.rptools.clientserver.ActivityListener; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -118,7 +119,7 @@ public void run() { } try { - SocketConnection.this.writeMessage(out, message); + writeMessage(out, message); } catch (IOException e) { log.error("Error while writing message. Closing connection.", e); return; @@ -128,6 +129,29 @@ public void run() { SocketConnection.this.close(); } } + + protected final void writeMessage(OutputStream out, byte[] message) throws IOException { + int length = message.length; + + notifyListeners(ActivityListener.Direction.Outbound, ActivityListener.State.Start, length, 0); + + out.write(length >> 24); + out.write(length >> 16); + out.write(length >> 8); + out.write(length); + + for (int i = 0; i < message.length; i++) { + out.write(message[i]); + + if (i != 0 && i % ActivityListener.CHUNK_SIZE == 0) { + notifyListeners( + ActivityListener.Direction.Outbound, ActivityListener.State.Progress, length, i); + } + } + out.flush(); + notifyListeners( + ActivityListener.Direction.Outbound, ActivityListener.State.Complete, length, length); + } } // ///////////////////////////////////////////////////////////////////////// @@ -154,7 +178,7 @@ public void run() { while (!SocketConnection.this.isClosed() && SocketConnection.this.isAlive()) { try { - byte[] message = SocketConnection.this.readMessage(in); + byte[] message = readMessage(in); SocketConnection.this.dispatchCompressedMessage(message); } catch (SocketTimeoutException e) { log.warn("Lost client {}", SocketConnection.this.getId(), e); @@ -172,5 +196,32 @@ public void run() { fireDisconnect(); } } + + private byte[] readMessage(InputStream in) throws IOException { + int b32 = in.read(); + int b24 = in.read(); + int b16 = in.read(); + int b8 = in.read(); + + if (b32 < 0) { + throw new IOException("Stream closed"); + } + int length = (b32 << 24) + (b24 << 16) + (b16 << 8) + b8; + + notifyListeners(ActivityListener.Direction.Inbound, ActivityListener.State.Start, length, 0); + + byte[] ret = new byte[length]; + for (int i = 0; i < length; i++) { + ret[i] = (byte) in.read(); + + if (i != 0 && i % ActivityListener.CHUNK_SIZE == 0) { + notifyListeners( + ActivityListener.Direction.Inbound, ActivityListener.State.Progress, length, i); + } + } + notifyListeners( + ActivityListener.Direction.Inbound, ActivityListener.State.Complete, length, length); + return ret; + } } } diff --git a/clientserver/src/main/java/net/rptools/clientserver/simple/connection/WebRTCConnection.java b/clientserver/src/main/java/net/rptools/clientserver/simple/connection/WebRTCConnection.java index 4c1eb5cb43..3aa96b7dd2 100644 --- a/clientserver/src/main/java/net/rptools/clientserver/simple/connection/WebRTCConnection.java +++ b/clientserver/src/main/java/net/rptools/clientserver/simple/connection/WebRTCConnection.java @@ -21,6 +21,7 @@ import java.net.URI; import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicBoolean; +import net.rptools.clientserver.ActivityListener; import net.rptools.clientserver.simple.server.WebRTCServer; import net.rptools.clientserver.simple.webrtc.*; import org.apache.logging.log4j.LogManager; @@ -566,5 +567,45 @@ public void onMessage(RTCDataChannelBuffer channelBuffer) { dispatchCompressedMessage(message); } } + + private ByteBuffer messageBuffer = null; + + private byte[] readMessage(ByteBuffer part) { + if (messageBuffer == null) { + int length = part.getInt(); + notifyListeners( + ActivityListener.Direction.Inbound, ActivityListener.State.Start, length, 0); + + if (part.remaining() == length) { + var ret = new byte[length]; + part.get(ret); + notifyListeners( + ActivityListener.Direction.Inbound, ActivityListener.State.Complete, length, length); + return ret; + } + + messageBuffer = ByteBuffer.allocate(length); + } + + messageBuffer.put(part); + notifyListeners( + ActivityListener.Direction.Inbound, + ActivityListener.State.Progress, + messageBuffer.capacity(), + messageBuffer.position()); + + if (messageBuffer.capacity() == messageBuffer.position()) { + notifyListeners( + ActivityListener.Direction.Inbound, + ActivityListener.State.Complete, + messageBuffer.capacity(), + messageBuffer.capacity()); + var ret = messageBuffer.array(); + messageBuffer = null; + return ret; + } + + return null; + } } } From d6257624e83890c30dd22a8f09ba9bb2eb6435df Mon Sep 17 00:00:00 2001 From: Kenneth VanderLinde Date: Wed, 17 Jun 2026 12:13:05 -0700 Subject: [PATCH 3/3] Fix NPE for SocketConnection.socket --- .../simple/connection/SocketConnection.java | 24 ++++++++++++++----- 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/clientserver/src/main/java/net/rptools/clientserver/simple/connection/SocketConnection.java b/clientserver/src/main/java/net/rptools/clientserver/simple/connection/SocketConnection.java index 4a39b0e192..69d17dcfd8 100644 --- a/clientserver/src/main/java/net/rptools/clientserver/simple/connection/SocketConnection.java +++ b/clientserver/src/main/java/net/rptools/clientserver/simple/connection/SocketConnection.java @@ -28,9 +28,12 @@ public class SocketConnection extends AbstractConnection implements Connection { /** Instance used for log messages. */ private static final Logger log = LogManager.getLogger(SocketConnection.class); + // Only valid for open connections. private SendThread send; private ReceiveThread receive; private Socket socket; + + // Only valid for pending connections before #open() is called. private String hostName; private int port; @@ -42,12 +45,12 @@ public SocketConnection(String id, String hostName, int port) { public SocketConnection(String id, Socket socket) { super(id); - this.socket = socket; - - initialize(socket); + connect(socket); } - private void initialize(Socket socket) { + private void connect(Socket socket) { + assert this.socket != null : "Should only call #connect() not already open"; + this.socket = socket; this.send = new SendThread(socket); this.receive = new ReceiveThread(socket); @@ -58,7 +61,11 @@ private void initialize(Socket socket) { @Override public void open() throws IOException { - initialize(new Socket(hostName, port)); + if (this.socket != null) { + throw new IOException("The connection has already been opened."); + } + + connect(new Socket(hostName, port)); } @Override @@ -68,6 +75,11 @@ public void sendMessage(Object channel, byte[] message) { @Override protected void onClose() { + if (socket == null) { + // Not open, so nothing to do. + return; + } + receive.interrupt(); send.interrupt(); @@ -80,7 +92,7 @@ protected void onClose() { @Override public boolean isAlive() { - return !socket.isClosed(); + return socket != null && !socket.isClosed(); } @Override