Skip to content

Commit

Permalink
Maintain pending futures and cancel them at cleanup.
Browse files Browse the repository at this point in the history
Signed-off-by: HenrikJannsen <boilingfrog@gmx.com>
  • Loading branch information
HenrikJannsen committed Jan 4, 2023
1 parent 3e48956 commit 5e29bfe
Showing 1 changed file with 28 additions and 3 deletions.
31 changes: 28 additions & 3 deletions p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -44,6 +47,7 @@
import lombok.extern.slf4j.Slf4j;

import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

@Slf4j
public class BroadcastHandler implements PeerManager.Listener {
Expand Down Expand Up @@ -79,8 +83,9 @@ public interface Listener {
private final AtomicInteger numOfCompletedBroadcasts = new AtomicInteger();
private final AtomicInteger numOfFailedBroadcasts = new AtomicInteger();
private final AtomicInteger numPeersForBroadcast = new AtomicInteger();

@Nullable
private Timer timeoutTimer;
private final Set<SettableFuture<Connection>> sendMessageFutures = new CopyOnWriteArraySet<>();


///////////////////////////////////////////////////////////////////////////////////////////
Expand All @@ -104,6 +109,10 @@ public interface Listener {
public void broadcast(List<Broadcaster.BroadcastRequest> broadcastRequests,
boolean shutDownRequested,
ListeningExecutorService executor) {
if (broadcastRequests.isEmpty()) {
return;
}

List<Connection> confirmedConnections = new ArrayList<>(networkNode.getConfirmedConnections());
Collections.shuffle(confirmedConnections);

Expand Down Expand Up @@ -162,7 +171,12 @@ public void broadcast(List<Broadcaster.BroadcastRequest> broadcastRequests,
return;
}

sendToPeer(connection, broadcastRequestsForConnection, executor);
try {
sendToPeer(connection, broadcastRequestsForConnection, executor);
} catch (RejectedExecutionException e) {
log.error("RejectedExecutionException at broadcast ", e);
cleanup();
}
}, minDelay, maxDelay, TimeUnit.MILLISECONDS);
}
}
Expand Down Expand Up @@ -250,7 +264,7 @@ private void sendToPeer(Connection connection,
// Can be BundleOfEnvelopes or a single BroadcastMessage
BroadcastMessage broadcastMessage = getMessage(broadcastRequestsForConnection);
SettableFuture<Connection> future = networkNode.sendMessage(connection, broadcastMessage, executor);

sendMessageFutures.add(future);
Futures.addCallback(future, new FutureCallback<>() {
@Override
public void onSuccess(Connection connection) {
Expand Down Expand Up @@ -324,11 +338,22 @@ private void checkForCompletion() {
}

private void cleanup() {
if (stopped.get()) {
return;
}

stopped.set(true);

if (timeoutTimer != null) {
timeoutTimer.stop();
timeoutTimer = null;
}

sendMessageFutures.stream()
.filter(future -> !future.isCancelled() && !future.isDone())
.forEach(future -> future.cancel(true));
sendMessageFutures.clear();

peerManager.removeListener(this);
resultHandler.onCompleted(this);
}
Expand Down

0 comments on commit 5e29bfe

Please sign in to comment.