From 5e29bfe4c21b78c783a1915754860a112edf7d98 Mon Sep 17 00:00:00 2001 From: HenrikJannsen Date: Wed, 4 Jan 2023 13:55:20 -0500 Subject: [PATCH] Maintain pending futures and cancel them at cleanup. Signed-off-by: HenrikJannsen --- .../network/p2p/peers/BroadcastHandler.java | 31 +++++++++++++++++-- 1 file changed, 28 insertions(+), 3 deletions(-) diff --git a/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java b/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java index 56ae58b9584..88b25d90bdf 100644 --- a/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java +++ b/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java @@ -35,7 +35,10 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Set; import java.util.UUID; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -44,6 +47,7 @@ import lombok.extern.slf4j.Slf4j; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; @Slf4j public class BroadcastHandler implements PeerManager.Listener { @@ -79,8 +83,9 @@ public interface Listener { private final AtomicInteger numOfCompletedBroadcasts = new AtomicInteger(); private final AtomicInteger numOfFailedBroadcasts = new AtomicInteger(); private final AtomicInteger numPeersForBroadcast = new AtomicInteger(); - + @Nullable private Timer timeoutTimer; + private final Set> sendMessageFutures = new CopyOnWriteArraySet<>(); /////////////////////////////////////////////////////////////////////////////////////////// @@ -104,6 +109,10 @@ public interface Listener { public void broadcast(List broadcastRequests, boolean shutDownRequested, ListeningExecutorService executor) { + if (broadcastRequests.isEmpty()) { + return; + } + List confirmedConnections = new ArrayList<>(networkNode.getConfirmedConnections()); Collections.shuffle(confirmedConnections); @@ -162,7 +171,12 @@ public void broadcast(List broadcastRequests, return; } - sendToPeer(connection, broadcastRequestsForConnection, executor); + try { + sendToPeer(connection, broadcastRequestsForConnection, executor); + } catch (RejectedExecutionException e) { + log.error("RejectedExecutionException at broadcast ", e); + cleanup(); + } }, minDelay, maxDelay, TimeUnit.MILLISECONDS); } } @@ -250,7 +264,7 @@ private void sendToPeer(Connection connection, // Can be BundleOfEnvelopes or a single BroadcastMessage BroadcastMessage broadcastMessage = getMessage(broadcastRequestsForConnection); SettableFuture future = networkNode.sendMessage(connection, broadcastMessage, executor); - + sendMessageFutures.add(future); Futures.addCallback(future, new FutureCallback<>() { @Override public void onSuccess(Connection connection) { @@ -324,11 +338,22 @@ private void checkForCompletion() { } private void cleanup() { + if (stopped.get()) { + return; + } + stopped.set(true); + if (timeoutTimer != null) { timeoutTimer.stop(); timeoutTimer = null; } + + sendMessageFutures.stream() + .filter(future -> !future.isCancelled() && !future.isDone()) + .forEach(future -> future.cancel(true)); + sendMessageFutures.clear(); + peerManager.removeListener(this); resultHandler.onCompleted(this); }