Skip to content

Commit

Permalink
Use virtual Threads (#55)
Browse files Browse the repository at this point in the history
  • Loading branch information
netcodedev committed Jul 1, 2024
1 parent 831d987 commit 1d660d5
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 21 deletions.
4 changes: 1 addition & 3 deletions src/main/java/dev/bitbite/networking/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -136,14 +136,12 @@ public boolean connect() {
if(this.readThread != null) {
this.readThread.interrupt();
}
this.readThread = new Thread(()->{
this.readThread = Thread.ofVirtual().name("read-thread").start(()->{
while(!Thread.interrupted()) {
this.iOHandler.read();
}
Thread.currentThread().interrupt();
});
this.readThread.setName("Data reader");
this.readThread.start();
this.disconnectedServerDetector.start();
} catch (Exception e) {
this.notifyListeners(EventType.CONNECTION_FAILED, e);
Expand Down
11 changes: 0 additions & 11 deletions src/main/java/dev/bitbite/networking/ClientManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ public class ClientManager extends Thread {

private boolean closing = false;
@Getter private final Server server;
private Thread readThread;
@Getter private CopyOnWriteArrayList<CommunicationHandler> communicationHandler;

/**
Expand All @@ -37,13 +36,6 @@ public ClientManager(Server server) {
@Override
public void run() {
this.server.notifyListeners(Server.EventType.ACCEPT_START);
this.readThread = new Thread(()->{
while(!readThread.isInterrupted()) {
this.communicationHandler.forEach(ch -> ch.getIOHandler().read());
}
});
readThread.setName("Data reader");
readThread.start();
while(!Thread.currentThread().isInterrupted()) {
if(this.server.getServerSocket().isClosed()) {
Thread.currentThread().interrupt();
Expand Down Expand Up @@ -90,9 +82,6 @@ public void run() {
public boolean close() {
closing = true;
Thread.currentThread().interrupt();
if(this.readThread != null) {
this.readThread.interrupt();
}
this.communicationHandler.forEach(ch -> ch.close());
return true;
}
Expand Down
13 changes: 11 additions & 2 deletions src/main/java/dev/bitbite/networking/CommunicationHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import java.io.IOException;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;

import dev.bitbite.networking.Server.EventType;
import lombok.Getter;
Expand All @@ -15,6 +15,7 @@ public class CommunicationHandler {
private Socket clientSocket;
private ClientManager clientManager;
@Getter private IOHandler iOHandler;
private Thread readThread;

/**
* Creates a CommunicationHandler object for a socket
Expand All @@ -32,6 +33,11 @@ public CommunicationHandler(Socket clientSocket, ClientManager clientManager) {
} catch (IOException e) {
this.clientManager.getServer().notifyListeners(Server.EventType.COMMUNICATIONHANDLER_INIT_FAILED, e);
}
this.readThread = Thread.ofVirtual().name("readthread-"+getIP()).start(() -> {
while(!readThread.isInterrupted()) {
this.iOHandler.readBlocking();
}
});
}

/**
Expand All @@ -40,6 +46,8 @@ public CommunicationHandler(Socket clientSocket, ClientManager clientManager) {
public void close() {
this.clientManager.getServer().notifyListeners(EventType.COMMUNICATIONHANDLER_CLOSE, this);
try {
this.readThread.interrupt();
this.readThread.join(100);
this.iOHandler.close();
this.clientSocket.close();
this.clientManager.removeCommunicationHandler(this);
Expand Down Expand Up @@ -96,11 +104,12 @@ public void registerListener(IOHandlerListener listener) {
* Registers a list of listeners to the underlying IOHandler
* @param listener to add
*/
public void registerListener(ArrayList<IOHandlerListener> listener) {
public void registerListener(List<IOHandlerListener> listener) {
listener.forEach(l -> this.iOHandler.registerListener(l));
}

/**
* Returns the remote socket address of the associated client socket
* @return the remote socket address of the associated client socket
*/
public String getIP() {
Expand Down
24 changes: 19 additions & 5 deletions src/main/java/dev/bitbite/networking/IOHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ public IOHandler(InputStream inputStream, OutputStream outputStream, Consumer<by
}
this.inputStream = inputStream;
this.outputStream = outputStream;
this.readBuffer = new ArrayList<Byte>();
this.readBuffer = new ArrayList<>();
this.readCallback = onRead;
this.listeners = new ArrayList<IOHandlerListener>();
this.listeners = new ArrayList<>();
this.lastRead = System.nanoTime();
}

Expand Down Expand Up @@ -99,8 +99,9 @@ public void read() {
return;
}
try {
if(inputStream.available() > 0) {
readNBytes(MAX_READ_SIZE);
int available = 0;
if((available = inputStream.available()) > 0) {
readNBytes(Math.min(available, MAX_READ_SIZE));
}
} catch (SocketException e) {
if(e.getMessage().contains("Connection reset") || e.getMessage().contains("Socket closed")) {
Expand All @@ -113,6 +114,19 @@ public void read() {
}
}

public void readBlocking() {
if(closing || closed) {
return;
}
try {
this.notifyListeners(EventType.DATA_READ_START);
readToNBytes(MAX_READ_SIZE);
this.notifyListeners(EventType.DATA_READ_END);
} catch (Exception e) {
this.notifyListeners(EventType.DATA_READ_FAILED, e);
}
}

/**
* Tries to read a set amount of bytes from the stream.
* If an end-of-message byte is detected the read bytes are passed to the
Expand Down Expand Up @@ -143,7 +157,7 @@ protected void readNBytes(int amount) {
}
}
} catch (SocketException e) {
if(e.getMessage().contains("Connection reset") || e.getMessage().contains("Socket closed")) {
if(e.getMessage().contains("Connection reset") || e.getMessage().contains("Socket closed") || e.getMessage().contains("Broken pipe")) {
close();
} else {
this.notifyListeners(EventType.DATA_READ_FAILED, e);
Expand Down

0 comments on commit 1d660d5

Please sign in to comment.