Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package net.rptools.clientserver.simple.connection;

import java.io.*;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
Expand All @@ -33,13 +32,23 @@
public abstract class AbstractConnection implements Connection {
private static final Logger log = LogManager.getLogger(AbstractConnection.class);

private final String id;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final BlockingQueue<byte[]> outQueue = new LinkedBlockingQueue<>();

private final List<DisconnectHandler> disconnectHandlers = new CopyOnWriteArrayList<>();
private final List<ActivityListener> listeners = new CopyOnWriteArrayList<>();
private final List<MessageHandler> messageHandlers = new CopyOnWriteArrayList<>();

protected AbstractConnection(String id) {
this.id = id;
}

@Override
public final String getId() {
return id;
}

@Override
public final void close() {
if (closed.compareAndSet(false, true)) {
Expand Down Expand Up @@ -115,95 +124,6 @@ protected final void dispatchCompressedMessage(byte[] compressedMessage) {
dispatchMessage(message);
}

protected final void writeMessage(OutputStream out, byte[] message) throws IOException {
int length = message.length;

notifyListeners(ActivityListener.Direction.Outbound, ActivityListener.State.Start, length, 0);

out.write(length >> 24);
out.write(length >> 16);
out.write(length >> 8);
out.write(length);

for (int i = 0; i < message.length; i++) {
out.write(message[i]);

if (i != 0 && i % ActivityListener.CHUNK_SIZE == 0) {
notifyListeners(
ActivityListener.Direction.Outbound, ActivityListener.State.Progress, length, i);
}
}
out.flush();
notifyListeners(
ActivityListener.Direction.Outbound, ActivityListener.State.Complete, length, length);
}

protected final byte[] readMessage(InputStream in) throws IOException {
int b32 = in.read();
int b24 = in.read();
int b16 = in.read();
int b8 = in.read();

if (b32 < 0) {
throw new IOException("Stream closed");
}
int length = (b32 << 24) + (b24 << 16) + (b16 << 8) + b8;

notifyListeners(ActivityListener.Direction.Inbound, ActivityListener.State.Start, length, 0);

byte[] ret = new byte[length];
for (int i = 0; i < length; i++) {
ret[i] = (byte) in.read();

if (i != 0 && i % ActivityListener.CHUNK_SIZE == 0) {
notifyListeners(
ActivityListener.Direction.Inbound, ActivityListener.State.Progress, length, i);
}
}
notifyListeners(
ActivityListener.Direction.Inbound, ActivityListener.State.Complete, length, length);
return ret;
}

private ByteBuffer messageBuffer = null;

protected final byte[] readMessage(ByteBuffer part) {
if (messageBuffer == null) {
int length = part.getInt();
notifyListeners(ActivityListener.Direction.Inbound, ActivityListener.State.Start, length, 0);

if (part.remaining() == length) {
var ret = new byte[length];
part.get(ret);
notifyListeners(
ActivityListener.Direction.Inbound, ActivityListener.State.Complete, length, length);
return ret;
}

messageBuffer = ByteBuffer.allocate(length);
}

messageBuffer.put(part);
notifyListeners(
ActivityListener.Direction.Inbound,
ActivityListener.State.Progress,
messageBuffer.capacity(),
messageBuffer.position());

if (messageBuffer.capacity() == messageBuffer.position()) {
notifyListeners(
ActivityListener.Direction.Inbound,
ActivityListener.State.Complete,
messageBuffer.capacity(),
messageBuffer.capacity());
var ret = messageBuffer.array();
messageBuffer = null;
return ret;
}

return null;
}

protected final void fireDisconnect() {
for (DisconnectHandler handler : disconnectHandlers) {
handler.handleDisconnect(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ public static Pair create(String id) {
}

private final AtomicBoolean sharedClosedFlag;
private final String id;
private final BlockingQueue<byte[]> writeQueue;
private final ReceiveThread receiveThread;

Expand All @@ -49,8 +48,8 @@ private DirectConnection(
String id,
BlockingQueue<byte[]> writeQueue,
BlockingQueue<byte[]> readQueue) {
super(id);
this.sharedClosedFlag = sharedClosedFlag;
this.id = id;
this.writeQueue = writeQueue;
this.receiveThread = new ReceiveThread(readQueue);
}
Expand Down Expand Up @@ -89,11 +88,6 @@ public boolean isAlive() {
return !sharedClosedFlag.get();
}

@Override
public String getId() {
return id;
}

@Override
public String getError() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.io.*;
import java.net.Socket;
import java.net.SocketTimeoutException;
import net.rptools.clientserver.ActivityListener;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -27,32 +28,29 @@ public class SocketConnection extends AbstractConnection implements Connection {
/** Instance used for log messages. */
private static final Logger log = LogManager.getLogger(SocketConnection.class);

private final String id;
// Only valid for open connections.
private SendThread send;
private ReceiveThread receive;
private Socket socket;

// Only valid for pending connections before #open() is called.
private String hostName;
private int port;

public SocketConnection(String id, String hostName, int port) {
this.id = id;
super(id);
this.hostName = hostName;
this.port = port;
}

public SocketConnection(String id, Socket socket) {
this.id = id;
this.socket = socket;

initialize(socket);
super(id);
connect(socket);
}

@Override
public String getId() {
return id;
}
private void connect(Socket socket) {
assert this.socket != null : "Should only call #connect() not already open";

private void initialize(Socket socket) {
this.socket = socket;
this.send = new SendThread(socket);
this.receive = new ReceiveThread(socket);
Expand All @@ -63,7 +61,11 @@ private void initialize(Socket socket) {

@Override
public void open() throws IOException {
initialize(new Socket(hostName, port));
if (this.socket != null) {
throw new IOException("The connection has already been opened.");
}

connect(new Socket(hostName, port));
}

@Override
Expand All @@ -73,6 +75,11 @@ public void sendMessage(Object channel, byte[] message) {

@Override
protected void onClose() {
if (socket == null) {
// Not open, so nothing to do.
return;
}

receive.interrupt();
send.interrupt();

Expand All @@ -85,7 +92,7 @@ protected void onClose() {

@Override
public boolean isAlive() {
return !socket.isClosed();
return socket != null && !socket.isClosed();
}

@Override
Expand Down Expand Up @@ -124,7 +131,7 @@ public void run() {
}

try {
SocketConnection.this.writeMessage(out, message);
writeMessage(out, message);
} catch (IOException e) {
log.error("Error while writing message. Closing connection.", e);
return;
Expand All @@ -134,6 +141,29 @@ public void run() {
SocketConnection.this.close();
}
}

protected final void writeMessage(OutputStream out, byte[] message) throws IOException {
int length = message.length;

notifyListeners(ActivityListener.Direction.Outbound, ActivityListener.State.Start, length, 0);

out.write(length >> 24);
out.write(length >> 16);
out.write(length >> 8);
out.write(length);

for (int i = 0; i < message.length; i++) {
out.write(message[i]);

if (i != 0 && i % ActivityListener.CHUNK_SIZE == 0) {
notifyListeners(
ActivityListener.Direction.Outbound, ActivityListener.State.Progress, length, i);
}
}
out.flush();
notifyListeners(
ActivityListener.Direction.Outbound, ActivityListener.State.Complete, length, length);
}
}

// /////////////////////////////////////////////////////////////////////////
Expand All @@ -160,7 +190,7 @@ public void run() {

while (!SocketConnection.this.isClosed() && SocketConnection.this.isAlive()) {
try {
byte[] message = SocketConnection.this.readMessage(in);
byte[] message = readMessage(in);
SocketConnection.this.dispatchCompressedMessage(message);
} catch (SocketTimeoutException e) {
log.warn("Lost client {}", SocketConnection.this.getId(), e);
Expand All @@ -178,5 +208,32 @@ public void run() {
fireDisconnect();
}
}

private byte[] readMessage(InputStream in) throws IOException {
int b32 = in.read();
int b24 = in.read();
int b16 = in.read();
int b8 = in.read();

if (b32 < 0) {
throw new IOException("Stream closed");
}
int length = (b32 << 24) + (b24 << 16) + (b16 << 8) + b8;

notifyListeners(ActivityListener.Direction.Inbound, ActivityListener.State.Start, length, 0);

byte[] ret = new byte[length];
for (int i = 0; i < length; i++) {
ret[i] = (byte) in.read();

if (i != 0 && i % ActivityListener.CHUNK_SIZE == 0) {
notifyListeners(
ActivityListener.Direction.Inbound, ActivityListener.State.Progress, length, i);
}
}
notifyListeners(
ActivityListener.Direction.Inbound, ActivityListener.State.Complete, length, length);
return ret;
}
}
}
Loading
Loading