Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bundle messages at broadcast #4436

Merged
merged 29 commits into from
Aug 31, 2020
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
badc872
Refactoring
chimp1984 Aug 24, 2020
c493282
Refactoring
chimp1984 Aug 24, 2020
5433707
Bundle broadcast requests
chimp1984 Aug 26, 2020
06f407f
Add todo and curly brackets
chimp1984 Aug 26, 2020
ea50ce8
Merge branch 'improve-network-statistics' into bundle-msg-at-broadcast
chimp1984 Aug 26, 2020
b8152d6
Change BROADCAST_INTERVAL_MS to 2 sec.
chimp1984 Aug 26, 2020
04b6c2a
Don't wrap into BundleOfEnvelopes is only 1 message is used
chimp1984 Aug 26, 2020
8747da4
Merge branch 'improve-network-statistics' into bundle-msg-at-broadcast
chimp1984 Aug 26, 2020
f67a467
LAST_ACTIVITY_AGE_MS should be millisec not sec
chimp1984 Aug 26, 2020
c6c56b3
Increase INTERVAL_SEC to 30-60 sec from 5-35 sec.
chimp1984 Aug 26, 2020
d59a3ad
Add logs
chimp1984 Aug 26, 2020
d8da20a
Remove TODO
chimp1984 Aug 26, 2020
bef4700
Merge branch 'improve-network-statistics' into bundle-msg-at-broadcast
chimp1984 Aug 27, 2020
b1702f7
- Add shutdown handling to broadCaster.
chimp1984 Aug 27, 2020
e780d49
Merge branch 'improve-network-statistics' into bundle-msg-at-broadcast
chimp1984 Aug 27, 2020
6270527
Fix log
chimp1984 Aug 27, 2020
21ff2df
Remove dev logs
chimp1984 Aug 27, 2020
0e8704e
Remove dev logs, remove unread property
chimp1984 Aug 27, 2020
28a665e
Satisfy annoying Codacy bot
chimp1984 Aug 28, 2020
c535ebb
Merge branch 'improve-network-statistics' into bundle-msg-at-broadcast
chimp1984 Aug 28, 2020
be78770
Merge branch 'master_upstream' into bundle-msg-at-broadcast
chimp1984 Aug 29, 2020
366fdb3
Merge remote-tracking branch 'origin/bundle-msg-at-broadcast' into bu…
chimp1984 Aug 29, 2020
b0eea78
Remove unused properties (was added again from a merge failure)
chimp1984 Aug 29, 2020
6accd58
Merge branch 'master_upstream' into bundle-msg-at-broadcast
chimp1984 Aug 30, 2020
0aa97d3
Merge remote-tracking branch 'origin/bundle-msg-at-broadcast' into bu…
chimp1984 Aug 30, 2020
8d13ff8
Add comment about size
chimp1984 Aug 30, 2020
49d212e
Fix tests.
chimp1984 Aug 30, 2020
bc802c8
Change log level to avoid too verbose logs
chimp1984 Aug 31, 2020
f7951d5
Combine if/else branches. Improve comments and variables
chimp1984 Aug 31, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public void requestHashesFromAllConnectedSeedNodes(int fromHeight) {

public void broadcastMyStateHash(StH myStateHash) {
NewStateHashMessage newStateHashMessage = getNewStateHashMessage(myStateHash);
broadcaster.broadcast(newStateHashMessage, networkNode.getNodeAddress(), null);
broadcaster.broadcast(newStateHashMessage, networkNode.getNodeAddress());
}

public void requestHashes(int fromHeight, String peersAddress) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public void publishNewBlock(Block block) {
log.info("Publish new block at height={} and block hash={}", block.getHeight(), block.getHash());
RawBlock rawBlock = RawBlock.fromBlock(block);
NewBlockBroadcastMessage newBlockBroadcastMessage = new NewBlockBroadcastMessage(rawBlock);
broadcaster.broadcast(newBlockBroadcastMessage, networkNode.getNodeAddress(), null);
broadcaster.broadcast(newBlockBroadcastMessage, networkNode.getNodeAddress());
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) {
log.debug("We received a new message from peer {} and broadcast it to our peers. extBlockId={}",
connection.getPeersNodeAddressOptional().orElse(null), extBlockId);
receivedBlocks.add(extBlockId);
broadcaster.broadcast(newBlockBroadcastMessage, connection.getPeersNodeAddressOptional().orElse(null), null);
broadcaster.broadcast(newBlockBroadcastMessage, connection.getPeersNodeAddressOptional().orElse(null));
listeners.forEach(listener -> listener.onNewBlockReceived(newBlockBroadcastMessage));
} else {
log.debug("We had that message already and do not further broadcast it. extBlockId={}", extBlockId);
Expand Down
11 changes: 9 additions & 2 deletions p2p/src/main/java/bisq/network/p2p/BundleOfEnvelopes.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package bisq.network.p2p;

import bisq.network.p2p.storage.messages.BroadcastMessage;
import bisq.network.p2p.storage.payload.CapabilityRequiringPayload;

import bisq.common.app.Capabilities;
Expand All @@ -36,14 +37,18 @@

@EqualsAndHashCode(callSuper = true)
@Value
public final class BundleOfEnvelopes extends NetworkEnvelope implements ExtendedDataSizePermission, CapabilityRequiringPayload {
public final class BundleOfEnvelopes extends BroadcastMessage implements ExtendedDataSizePermission, CapabilityRequiringPayload {

private final List<NetworkEnvelope> envelopes;

public BundleOfEnvelopes() {
this(new ArrayList<>(), Version.getP2PMessageVersion());
}

public BundleOfEnvelopes(List<NetworkEnvelope> envelopes) {
this(envelopes, Version.getP2PMessageVersion());
}

public void add(NetworkEnvelope networkEnvelope) {
envelopes.add(networkEnvelope);
}
Expand All @@ -67,7 +72,9 @@ public protobuf.NetworkEnvelope toProtoNetworkEnvelope() {
.build();
}

public static BundleOfEnvelopes fromProto(protobuf.BundleOfEnvelopes proto, NetworkProtoResolver resolver, int messageVersion) {
public static BundleOfEnvelopes fromProto(protobuf.BundleOfEnvelopes proto,
NetworkProtoResolver resolver,
int messageVersion) {
List<NetworkEnvelope> envelopes = proto.getEnvelopesList()
.stream()
.map(envelope -> {
Expand Down
122 changes: 48 additions & 74 deletions p2p/src/main/java/bisq/network/p2p/P2PService.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import bisq.network.p2p.storage.HashMapChangedListener;
import bisq.network.p2p.storage.P2PDataStorage;
import bisq.network.p2p.storage.messages.AddDataMessage;
import bisq.network.p2p.storage.messages.BroadcastMessage;
import bisq.network.p2p.storage.messages.RefreshOfferMessage;
import bisq.network.p2p.storage.payload.CapabilityRequiringPayload;
import bisq.network.p2p.storage.payload.MailboxStoragePayload;
Expand All @@ -53,7 +52,6 @@
import bisq.common.proto.ProtobufferException;
import bisq.common.proto.network.NetworkEnvelope;
import bisq.common.proto.persistable.PersistedDataHost;
import bisq.common.util.Utilities;

import com.google.inject.Inject;

Expand All @@ -77,6 +75,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -122,9 +121,6 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
private final BooleanProperty preliminaryDataReceived = new SimpleBooleanProperty();
private final IntegerProperty numConnectedPeers = new SimpleIntegerProperty(0);

private volatile boolean shutDownInProgress;
@Getter
private boolean shutDownComplete;
private final Subscription networkReadySubscription;
private boolean isBootstrapped;
private final KeepAliveManager keepAliveManager;
Expand Down Expand Up @@ -212,48 +208,48 @@ public void onAllServicesInitialized() {
}

public void shutDown(Runnable shutDownCompleteHandler) {
if (!shutDownInProgress) {
shutDownInProgress = true;

shutDownResultHandlers.add(shutDownCompleteHandler);
shutDownResultHandlers.add(shutDownCompleteHandler);

if (p2PDataStorage != null)
p2PDataStorage.shutDown();
// We need to make sure queued up messages are flushed out before we continue shut down other network
// services
if (broadcaster != null) {
broadcaster.shutDown(this::doShutDown);
} else {
doShutDown();
}
}

if (peerManager != null)
peerManager.shutDown();
private void doShutDown() {
if (p2PDataStorage != null) {
p2PDataStorage.shutDown();
}

if (broadcaster != null)
broadcaster.shutDown();
if (peerManager != null) {
peerManager.shutDown();
}

if (requestDataManager != null)
requestDataManager.shutDown();
if (requestDataManager != null) {
requestDataManager.shutDown();
}

if (peerExchangeManager != null)
peerExchangeManager.shutDown();
if (peerExchangeManager != null) {
peerExchangeManager.shutDown();
}

if (keepAliveManager != null)
keepAliveManager.shutDown();
if (keepAliveManager != null) {
keepAliveManager.shutDown();
}

if (networkReadySubscription != null)
networkReadySubscription.unsubscribe();
if (networkReadySubscription != null) {
networkReadySubscription.unsubscribe();
}

if (networkNode != null) {
networkNode.shutDown(() -> {
shutDownResultHandlers.stream().forEach(Runnable::run);
shutDownComplete = true;
});
} else {
shutDownResultHandlers.stream().forEach(Runnable::run);
shutDownComplete = true;
}
if (networkNode != null) {
networkNode.shutDown(() -> {
shutDownResultHandlers.forEach(Runnable::run);
});
} else {
log.debug("shutDown already in progress");
if (shutDownComplete) {
shutDownCompleteHandler.run();
} else {
shutDownResultHandlers.add(shutDownCompleteHandler);
}
shutDownResultHandlers.forEach(Runnable::run);
}
}

Expand Down Expand Up @@ -448,7 +444,7 @@ public void onAdded(Collection<ProtectedStorageEntry> protectedStorageEntries) {

@Override
public void onRemoved(Collection<ProtectedStorageEntry> protectedStorageEntries) {
// not handled
// not used
}

///////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -672,43 +668,21 @@ private void addMailboxData(MailboxStoragePayload expirableMailboxStoragePayload

BroadcastHandler.Listener listener = new BroadcastHandler.Listener() {
@Override
public void onBroadcasted(BroadcastMessage message, int numOfCompletedBroadcasts) {
public void onSufficientlyBroadcast(List<Broadcaster.BroadcastRequest> broadcastRequests) {
broadcastRequests.stream()
.filter(broadcastRequest -> broadcastRequest.getMessage() instanceof AddDataMessage)
.filter(broadcastRequest -> {
AddDataMessage addDataMessage = (AddDataMessage) broadcastRequest.getMessage();
return addDataMessage.getProtectedStorageEntry().equals(protectedMailboxStorageEntry);
})
.forEach(e -> sendMailboxMessageListener.onStoredInMailbox());
}

@Override
public void onBroadcastedToFirstPeer(BroadcastMessage message) {
// The reason for that check was to separate different callback for different send calls.
// We only want to notify our sendMailboxMessageListener for the calls he is interested in.
if (message instanceof AddDataMessage &&
((AddDataMessage) message).getProtectedStorageEntry().equals(protectedMailboxStorageEntry)) {
// We delay a bit to give more time for sufficient propagation in the P2P network.
// This should help to avoid situations where a user closes the app too early and the msg
// does not arrive.
// We could use onBroadcastCompleted instead but it might take too long if one peer
// is very badly connected.
// TODO We could check for a certain threshold of no. of incoming network_messages of the same msg
// to see how well it is propagated. BitcoinJ uses such an approach for tx propagation.
UserThread.runAfter(() -> {
log.info("Broadcasted to first peer (3 sec. ago): Message = {}", Utilities.toTruncatedString(message));
sendMailboxMessageListener.onStoredInMailbox();
}, 3);
}
}

@Override
public void onBroadcastCompleted(BroadcastMessage message,
int numOfCompletedBroadcasts,
int numOfFailedBroadcasts) {
log.info("Broadcast completed: Sent to {} peers (failed: {}). Message = {}",
numOfCompletedBroadcasts, numOfFailedBroadcasts, Utilities.toTruncatedString(message));
if (numOfCompletedBroadcasts == 0)
sendMailboxMessageListener.onFault("Broadcast completed without any successful broadcast");
}

@Override
public void onBroadcastFailed(String errorMessage) {
// TODO investigate why not sending sendMailboxMessageListener.onFault. Related probably
// to the logic from BroadcastHandler.sendToPeer
public void onNotSufficientlyBroadcast(int numOfCompletedBroadcasts, int numOfFailedBroadcast) {
sendMailboxMessageListener.onFault("Message was not sufficiently broadcast.\n" +
"numOfCompletedBroadcasts: " + numOfCompletedBroadcasts + ".\n" +
"numOfFailedBroadcast=" + numOfFailedBroadcast);
}
};
boolean result = p2PDataStorage.addProtectedStorageEntry(protectedMailboxStorageEntry, networkNode.getNodeAddress(), listener);
Expand All @@ -721,7 +695,7 @@ public void onBroadcastFailed(String errorMessage) {
log.error("Unexpected state: adding mailbox message that already exists.");
}
} catch (CryptoException e) {
log.error("Signing at getDataWithSignedSeqNr failed. That should never happen.");
log.error("Signing at getMailboxDataWithSignedSeqNr failed.");
}
} else {
sendMailboxMessageListener.onFault("There are no P2P network nodes connected. " +
Expand Down
9 changes: 6 additions & 3 deletions p2p/src/main/java/bisq/network/p2p/network/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ public Capabilities getCapabilities() {

// Called from various threads
public void sendMessage(NetworkEnvelope networkEnvelope) {
log.debug(">> Send networkEnvelope of type: " + networkEnvelope.getClass().getSimpleName());
log.debug(">> Send networkEnvelope of type: {}", networkEnvelope.getClass().getSimpleName());

if (!stopped) {
if (noCapabilityRequiredOrCapabilityIsSupported(networkEnvelope)) {
Expand Down Expand Up @@ -319,6 +319,8 @@ public void sendMessage(NetworkEnvelope networkEnvelope) {
}
}

// TODO: If msg is BundleOfEnvelopes we should check each individual message for capability and filter out those
// which fail.
public boolean noCapabilityRequiredOrCapabilityIsSupported(Proto msg) {
boolean result;
if (msg instanceof AddDataMessage) {
Expand Down Expand Up @@ -408,12 +410,13 @@ private boolean violatesThrottleLimit(long now, int seconds, int messageCountLim
public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) {
checkArgument(connection.equals(this));

if (networkEnvelope instanceof BundleOfEnvelopes)
if (networkEnvelope instanceof BundleOfEnvelopes) {
for (NetworkEnvelope current : ((BundleOfEnvelopes) networkEnvelope).getEnvelopes()) {
UserThread.execute(() -> messageListeners.forEach(e -> e.onMessage(current, connection)));
}
else
} else {
UserThread.execute(() -> messageListeners.forEach(e -> e.onMessage(networkEnvelope, connection)));
}
}


Expand Down
Loading