Skip to content

Commit

Permalink
Merge pull request #6451 from HenrikJannsen/avoid_sending_repeated_bl…
Browse files Browse the repository at this point in the history
…ock_related_report_data

Avoid sending repeated block related report data
  • Loading branch information
alejandrogarcia83 authored Dec 11, 2022
2 parents d1dc6c7 + 3920e81 commit 6c7764f
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,14 @@ public class DaoStateMonitoringService implements DaoSetupService, DaoStateListe
DaoStateNetworkService.Listener<NewDaoStateHashMessage, GetDaoStateHashesRequest, DaoStateHash> {

public interface Listener {
void onDaoStateHashesChanged();
default void onDaoStateHashesChanged() {
}

default void onCheckpointFail() {
}

void onCheckpointFail();
default void onDaoStateBlockCreated() {
}
}

private final DaoStateService daoStateService;
Expand Down Expand Up @@ -347,7 +352,7 @@ private Optional<DaoStateBlock> createDaoStateBlock(Block block) {
// We only broadcast after parsing of blockchain is complete
if (parseBlockChainComplete) {
// We delay broadcast to give peers enough time to have received the block.
// Otherwise they would ignore our data if received block is in future to their local blockchain.
// Otherwise, they would ignore our data if received block is in future to their local blockchain.
int delayInSec = 5 + new Random().nextInt(10);
if (Config.baseCurrencyNetwork().isRegtest()) {
delayInSec = 1;
Expand All @@ -361,6 +366,7 @@ private Optional<DaoStateBlock> createDaoStateBlock(Block block) {
duration);
accumulatedDuration += duration;
numCalls++;
listeners.forEach(Listener::onDaoStateBlockCreated);
return Optional.of(daoStateBlock);
}

Expand All @@ -371,15 +377,14 @@ private void processPeersDaoStateHashes(List<DaoStateHash> stateHashes, Optional
// If we do not add own hashes during initial parsing we fill the missing hashes from the peer and create
// at the last block our own hash.
int height = peersHash.getHeight();
if (!useDaoMonitor &&
!findDaoStateBlock(height).isPresent()) {
if (!useDaoMonitor && findDaoStateBlock(height).isEmpty()) {
if (daoStateService.getChainHeight() == height) {
// At the most recent block we create our own hash
optionalDaoStateBlock = daoStateService.getLastBlock()
.map(this::createDaoStateBlock)
.orElse(findDaoStateBlock(height));
} else {
// Otherwise we create a block from the peers daoStateHash
// Otherwise, we create a block from the peers daoStateHash
DaoStateHash daoStateHash = new DaoStateHash(height, peersHash.getHash(), false);
DaoStateBlock daoStateBlock = new DaoStateBlock(daoStateHash);
daoStateBlockChain.add(daoStateBlock);
Expand Down
71 changes: 34 additions & 37 deletions seednode/src/main/java/bisq/seednode/SeedNodeReportingService.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,9 @@ public class SeedNodeReportingService {
private final String seedNodeReportingServerUrl;
private final DaoStateListener daoStateListener;
private final HttpClient httpClient;

private Timer dataReportTimer;
private final Timer heartBeatTimer;
private final ExecutorService executor;
private final Timer heartBeatTimer;
private Timer dataReportTimer;

@Inject
public SeedNodeReportingService(P2PService p2PService,
Expand Down Expand Up @@ -128,28 +127,22 @@ public SeedNodeReportingService(P2PService p2PService,

heartBeatTimer = UserThread.runPeriodically(this::sendHeartBeat, HEART_BEAT_DELAY_SEC);

// We send each time when a new block is received and the DAO hash has been provided (which
// takes a bit after the block arrives).
daoStateMonitoringService.addListener(new DaoStateMonitoringService.Listener() {
@Override
public void onDaoStateHashesChanged() {
sendBlockRelatedData();
}

@Override
public void onCheckpointFail() {
}
});

// Independent of the block
daoStateListener = new DaoStateListener() {
@Override
public void onParseBlockChainComplete() {
daoFacade.removeBsqStateListener(daoStateListener);
dataReportTimer = UserThread.runPeriodically(() -> sendDataReport(), REPORT_DELAY_SEC);
sendDataReport();

sendBlockRelatedData();

// We send each time when a new block is received and the DAO hash has been provided (which
// takes a bit after the block arrives).
daoStateMonitoringService.addListener(new DaoStateMonitoringService.Listener() {
@Override
public void onDaoStateBlockCreated() {
sendBlockRelatedData();
}
});
}
};
daoFacade.addBsqStateListener(daoStateListener);
Expand Down Expand Up @@ -252,26 +245,30 @@ private void sendDataReport() {
}

private void sendReportingItems(ReportingItems reportingItems) {
CompletableFuture.runAsync(() -> {
log.info("Send report to monitor server: {}", reportingItems.toString());
// We send the data as hex encoded protobuf data. We do not use the envelope as it is not part of the p2p system.
byte[] protoMessageAsBytes = reportingItems.toProtoMessageAsBytes();
try {
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(seedNodeReportingServerUrl))
.POST(HttpRequest.BodyPublishers.ofByteArray(protoMessageAsBytes))
.header("User-Agent", getMyAddress())
.build();
HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
if (response.statusCode() != 200) {
log.error("Response error message: {}", response);
try {
CompletableFuture.runAsync(() -> {
log.info("Send report to monitor server: {}", reportingItems.toString());
// We send the data as hex encoded protobuf data. We do not use the envelope as it is not part of the p2p system.
byte[] protoMessageAsBytes = reportingItems.toProtoMessageAsBytes();
try {
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(seedNodeReportingServerUrl))
.POST(HttpRequest.BodyPublishers.ofByteArray(protoMessageAsBytes))
.header("User-Agent", getMyAddress())
.build();
HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
if (response.statusCode() != 200) {
log.error("Response error message: {}", response);
}
} catch (IOException e) {
log.warn("IOException at sending reporting. {}", e.getMessage());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
} catch (IOException e) {
log.warn("IOException at sending reporting. {}", e.getMessage());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, executor);
}, executor);
} catch (Throwable t) {
log.error("Did not send reportingItems {} because of exception {}", reportingItems, t.toString());
}
}

private String getMyAddress() {
Expand Down

0 comments on commit 6c7764f

Please sign in to comment.