From 491c6a0861c9dd34f2fdc22ab321e751589e8de5 Mon Sep 17 00:00:00 2001 From: HenrikJannsen Date: Sat, 10 Dec 2022 19:19:29 -0500 Subject: [PATCH 1/2] Add onDaoStateBlockCreated method Use onDaoStateBlockCreated instead of onDaoStateHashesChanged to avoid multiple calls when we receive hashes from other nodes. Add daoStateMonitoringService listener after blockchain parsing is completed Signed-off-by: HenrikJannsen --- .../monitoring/DaoStateMonitoringService.java | 17 +++++++---- .../seednode/SeedNodeReportingService.java | 29 +++++++------------ 2 files changed, 22 insertions(+), 24 deletions(-) diff --git a/core/src/main/java/bisq/core/dao/monitoring/DaoStateMonitoringService.java b/core/src/main/java/bisq/core/dao/monitoring/DaoStateMonitoringService.java index ad446bac3fa..1d35f3b2371 100644 --- a/core/src/main/java/bisq/core/dao/monitoring/DaoStateMonitoringService.java +++ b/core/src/main/java/bisq/core/dao/monitoring/DaoStateMonitoringService.java @@ -89,9 +89,14 @@ public class DaoStateMonitoringService implements DaoSetupService, DaoStateListe DaoStateNetworkService.Listener { public interface Listener { - void onDaoStateHashesChanged(); + default void onDaoStateHashesChanged() { + } + + default void onCheckpointFail() { + } - void onCheckpointFail(); + default void onDaoStateBlockCreated() { + } } private final DaoStateService daoStateService; @@ -347,7 +352,7 @@ private Optional createDaoStateBlock(Block block) { // We only broadcast after parsing of blockchain is complete if (parseBlockChainComplete) { // We delay broadcast to give peers enough time to have received the block. - // Otherwise they would ignore our data if received block is in future to their local blockchain. + // Otherwise, they would ignore our data if received block is in future to their local blockchain. int delayInSec = 5 + new Random().nextInt(10); if (Config.baseCurrencyNetwork().isRegtest()) { delayInSec = 1; @@ -361,6 +366,7 @@ private Optional createDaoStateBlock(Block block) { duration); accumulatedDuration += duration; numCalls++; + listeners.forEach(Listener::onDaoStateBlockCreated); return Optional.of(daoStateBlock); } @@ -371,15 +377,14 @@ private void processPeersDaoStateHashes(List stateHashes, Optional // If we do not add own hashes during initial parsing we fill the missing hashes from the peer and create // at the last block our own hash. int height = peersHash.getHeight(); - if (!useDaoMonitor && - !findDaoStateBlock(height).isPresent()) { + if (!useDaoMonitor && findDaoStateBlock(height).isEmpty()) { if (daoStateService.getChainHeight() == height) { // At the most recent block we create our own hash optionalDaoStateBlock = daoStateService.getLastBlock() .map(this::createDaoStateBlock) .orElse(findDaoStateBlock(height)); } else { - // Otherwise we create a block from the peers daoStateHash + // Otherwise, we create a block from the peers daoStateHash DaoStateHash daoStateHash = new DaoStateHash(height, peersHash.getHash(), false); DaoStateBlock daoStateBlock = new DaoStateBlock(daoStateHash); daoStateBlockChain.add(daoStateBlock); diff --git a/seednode/src/main/java/bisq/seednode/SeedNodeReportingService.java b/seednode/src/main/java/bisq/seednode/SeedNodeReportingService.java index f8feb943410..b950b9c75b0 100644 --- a/seednode/src/main/java/bisq/seednode/SeedNodeReportingService.java +++ b/seednode/src/main/java/bisq/seednode/SeedNodeReportingService.java @@ -95,10 +95,9 @@ public class SeedNodeReportingService { private final String seedNodeReportingServerUrl; private final DaoStateListener daoStateListener; private final HttpClient httpClient; - - private Timer dataReportTimer; - private final Timer heartBeatTimer; private final ExecutorService executor; + private final Timer heartBeatTimer; + private Timer dataReportTimer; @Inject public SeedNodeReportingService(P2PService p2PService, @@ -128,28 +127,22 @@ public SeedNodeReportingService(P2PService p2PService, heartBeatTimer = UserThread.runPeriodically(this::sendHeartBeat, HEART_BEAT_DELAY_SEC); - // We send each time when a new block is received and the DAO hash has been provided (which - // takes a bit after the block arrives). - daoStateMonitoringService.addListener(new DaoStateMonitoringService.Listener() { - @Override - public void onDaoStateHashesChanged() { - sendBlockRelatedData(); - } - - @Override - public void onCheckpointFail() { - } - }); - - // Independent of the block daoStateListener = new DaoStateListener() { @Override public void onParseBlockChainComplete() { daoFacade.removeBsqStateListener(daoStateListener); dataReportTimer = UserThread.runPeriodically(() -> sendDataReport(), REPORT_DELAY_SEC); sendDataReport(); - sendBlockRelatedData(); + + // We send each time when a new block is received and the DAO hash has been provided (which + // takes a bit after the block arrives). + daoStateMonitoringService.addListener(new DaoStateMonitoringService.Listener() { + @Override + public void onDaoStateBlockCreated() { + sendBlockRelatedData(); + } + }); } }; daoFacade.addBsqStateListener(daoStateListener); From 3920e81f1c44b24258a22b1265f88624d10394e7 Mon Sep 17 00:00:00 2001 From: HenrikJannsen Date: Sat, 10 Dec 2022 19:26:45 -0500 Subject: [PATCH 2/2] Add try catch at send (to cover case when thread pool is exhausted) Signed-off-by: HenrikJannsen --- .../seednode/SeedNodeReportingService.java | 42 ++++++++++--------- 1 file changed, 23 insertions(+), 19 deletions(-) diff --git a/seednode/src/main/java/bisq/seednode/SeedNodeReportingService.java b/seednode/src/main/java/bisq/seednode/SeedNodeReportingService.java index b950b9c75b0..9fc2bc8b201 100644 --- a/seednode/src/main/java/bisq/seednode/SeedNodeReportingService.java +++ b/seednode/src/main/java/bisq/seednode/SeedNodeReportingService.java @@ -245,26 +245,30 @@ private void sendDataReport() { } private void sendReportingItems(ReportingItems reportingItems) { - CompletableFuture.runAsync(() -> { - log.info("Send report to monitor server: {}", reportingItems.toString()); - // We send the data as hex encoded protobuf data. We do not use the envelope as it is not part of the p2p system. - byte[] protoMessageAsBytes = reportingItems.toProtoMessageAsBytes(); - try { - HttpRequest request = HttpRequest.newBuilder() - .uri(URI.create(seedNodeReportingServerUrl)) - .POST(HttpRequest.BodyPublishers.ofByteArray(protoMessageAsBytes)) - .header("User-Agent", getMyAddress()) - .build(); - HttpResponse response = httpClient.send(request, HttpResponse.BodyHandlers.ofString()); - if (response.statusCode() != 200) { - log.error("Response error message: {}", response); + try { + CompletableFuture.runAsync(() -> { + log.info("Send report to monitor server: {}", reportingItems.toString()); + // We send the data as hex encoded protobuf data. We do not use the envelope as it is not part of the p2p system. + byte[] protoMessageAsBytes = reportingItems.toProtoMessageAsBytes(); + try { + HttpRequest request = HttpRequest.newBuilder() + .uri(URI.create(seedNodeReportingServerUrl)) + .POST(HttpRequest.BodyPublishers.ofByteArray(protoMessageAsBytes)) + .header("User-Agent", getMyAddress()) + .build(); + HttpResponse response = httpClient.send(request, HttpResponse.BodyHandlers.ofString()); + if (response.statusCode() != 200) { + log.error("Response error message: {}", response); + } + } catch (IOException e) { + log.warn("IOException at sending reporting. {}", e.getMessage()); + } catch (InterruptedException e) { + throw new RuntimeException(e); } - } catch (IOException e) { - log.warn("IOException at sending reporting. {}", e.getMessage()); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - }, executor); + }, executor); + } catch (Throwable t) { + log.error("Did not send reportingItems {} because of exception {}", reportingItems, t.toString()); + } } private String getMyAddress() {