From 1d660d5f971770474615d17f2294349ce9e9441e Mon Sep 17 00:00:00 2001 From: Benedict Mondini Date: Mon, 1 Jul 2024 17:59:11 +0200 Subject: [PATCH] Use virtual Threads (#55) --- .../java/dev/bitbite/networking/Client.java | 4 +--- .../dev/bitbite/networking/ClientManager.java | 11 --------- .../networking/CommunicationHandler.java | 13 ++++++++-- .../dev/bitbite/networking/IOHandler.java | 24 +++++++++++++++---- 4 files changed, 31 insertions(+), 21 deletions(-) diff --git a/src/main/java/dev/bitbite/networking/Client.java b/src/main/java/dev/bitbite/networking/Client.java index 35d75e8..76f36e8 100644 --- a/src/main/java/dev/bitbite/networking/Client.java +++ b/src/main/java/dev/bitbite/networking/Client.java @@ -136,14 +136,12 @@ public boolean connect() { if(this.readThread != null) { this.readThread.interrupt(); } - this.readThread = new Thread(()->{ + this.readThread = Thread.ofVirtual().name("read-thread").start(()->{ while(!Thread.interrupted()) { this.iOHandler.read(); } Thread.currentThread().interrupt(); }); - this.readThread.setName("Data reader"); - this.readThread.start(); this.disconnectedServerDetector.start(); } catch (Exception e) { this.notifyListeners(EventType.CONNECTION_FAILED, e); diff --git a/src/main/java/dev/bitbite/networking/ClientManager.java b/src/main/java/dev/bitbite/networking/ClientManager.java index 9b1a5e0..dc5abdd 100644 --- a/src/main/java/dev/bitbite/networking/ClientManager.java +++ b/src/main/java/dev/bitbite/networking/ClientManager.java @@ -16,7 +16,6 @@ public class ClientManager extends Thread { private boolean closing = false; @Getter private final Server server; - private Thread readThread; @Getter private CopyOnWriteArrayList communicationHandler; /** @@ -37,13 +36,6 @@ public ClientManager(Server server) { @Override public void run() { this.server.notifyListeners(Server.EventType.ACCEPT_START); - this.readThread = new Thread(()->{ - while(!readThread.isInterrupted()) { - this.communicationHandler.forEach(ch -> ch.getIOHandler().read()); - } - }); - readThread.setName("Data reader"); - readThread.start(); while(!Thread.currentThread().isInterrupted()) { if(this.server.getServerSocket().isClosed()) { Thread.currentThread().interrupt(); @@ -90,9 +82,6 @@ public void run() { public boolean close() { closing = true; Thread.currentThread().interrupt(); - if(this.readThread != null) { - this.readThread.interrupt(); - } this.communicationHandler.forEach(ch -> ch.close()); return true; } diff --git a/src/main/java/dev/bitbite/networking/CommunicationHandler.java b/src/main/java/dev/bitbite/networking/CommunicationHandler.java index ad95ca0..8b6814a 100644 --- a/src/main/java/dev/bitbite/networking/CommunicationHandler.java +++ b/src/main/java/dev/bitbite/networking/CommunicationHandler.java @@ -2,7 +2,7 @@ import java.io.IOException; import java.net.Socket; -import java.util.ArrayList; +import java.util.List; import dev.bitbite.networking.Server.EventType; import lombok.Getter; @@ -15,6 +15,7 @@ public class CommunicationHandler { private Socket clientSocket; private ClientManager clientManager; @Getter private IOHandler iOHandler; + private Thread readThread; /** * Creates a CommunicationHandler object for a socket @@ -32,6 +33,11 @@ public CommunicationHandler(Socket clientSocket, ClientManager clientManager) { } catch (IOException e) { this.clientManager.getServer().notifyListeners(Server.EventType.COMMUNICATIONHANDLER_INIT_FAILED, e); } + this.readThread = Thread.ofVirtual().name("readthread-"+getIP()).start(() -> { + while(!readThread.isInterrupted()) { + this.iOHandler.readBlocking(); + } + }); } /** @@ -40,6 +46,8 @@ public CommunicationHandler(Socket clientSocket, ClientManager clientManager) { public void close() { this.clientManager.getServer().notifyListeners(EventType.COMMUNICATIONHANDLER_CLOSE, this); try { + this.readThread.interrupt(); + this.readThread.join(100); this.iOHandler.close(); this.clientSocket.close(); this.clientManager.removeCommunicationHandler(this); @@ -96,11 +104,12 @@ public void registerListener(IOHandlerListener listener) { * Registers a list of listeners to the underlying IOHandler * @param listener to add */ - public void registerListener(ArrayList listener) { + public void registerListener(List listener) { listener.forEach(l -> this.iOHandler.registerListener(l)); } /** + * Returns the remote socket address of the associated client socket * @return the remote socket address of the associated client socket */ public String getIP() { diff --git a/src/main/java/dev/bitbite/networking/IOHandler.java b/src/main/java/dev/bitbite/networking/IOHandler.java index 7a042d2..f865b5c 100644 --- a/src/main/java/dev/bitbite/networking/IOHandler.java +++ b/src/main/java/dev/bitbite/networking/IOHandler.java @@ -66,9 +66,9 @@ public IOHandler(InputStream inputStream, OutputStream outputStream, Consumer(); + this.readBuffer = new ArrayList<>(); this.readCallback = onRead; - this.listeners = new ArrayList(); + this.listeners = new ArrayList<>(); this.lastRead = System.nanoTime(); } @@ -99,8 +99,9 @@ public void read() { return; } try { - if(inputStream.available() > 0) { - readNBytes(MAX_READ_SIZE); + int available = 0; + if((available = inputStream.available()) > 0) { + readNBytes(Math.min(available, MAX_READ_SIZE)); } } catch (SocketException e) { if(e.getMessage().contains("Connection reset") || e.getMessage().contains("Socket closed")) { @@ -113,6 +114,19 @@ public void read() { } } + public void readBlocking() { + if(closing || closed) { + return; + } + try { + this.notifyListeners(EventType.DATA_READ_START); + readToNBytes(MAX_READ_SIZE); + this.notifyListeners(EventType.DATA_READ_END); + } catch (Exception e) { + this.notifyListeners(EventType.DATA_READ_FAILED, e); + } + } + /** * Tries to read a set amount of bytes from the stream. * If an end-of-message byte is detected the read bytes are passed to the @@ -143,7 +157,7 @@ protected void readNBytes(int amount) { } } } catch (SocketException e) { - if(e.getMessage().contains("Connection reset") || e.getMessage().contains("Socket closed")) { + if(e.getMessage().contains("Connection reset") || e.getMessage().contains("Socket closed") || e.getMessage().contains("Broken pipe")) { close(); } else { this.notifyListeners(EventType.DATA_READ_FAILED, e);